diff --git a/crates/scion-proto/src/packet/headers.rs b/crates/scion-proto/src/packet/headers.rs index d1a44ef..f8735fa 100644 --- a/crates/scion-proto/src/packet/headers.rs +++ b/crates/scion-proto/src/packet/headers.rs @@ -107,7 +107,7 @@ impl WireEncode for ScionHeaders { } /// Instances of an object associated with both a source and destination endpoint. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)] pub struct ByEndpoint { /// The value for the source pub source: T, diff --git a/crates/scion-proto/src/path.rs b/crates/scion-proto/src/path.rs index 5270785..4a4822f 100644 --- a/crates/scion-proto/src/path.rs +++ b/crates/scion-proto/src/path.rs @@ -30,6 +30,9 @@ pub use fingerprint::{FingerprintError, PathFingerprint}; mod metadata; pub use metadata::{GeoCoordinates, LinkType, PathInterface, PathMetadata}; +/// Minimum MTU along any path or within any AS. +pub const PATH_MIN_MTU: u16 = 1280; + /// A SCION end-to-end path with optional metadata. #[derive(Debug, Clone)] pub struct Path { @@ -65,7 +68,28 @@ impl Path { /// Panics if the AS is a wildcard AS. pub fn local(isd_asn: IsdAsn) -> Self { assert!(!isd_asn.is_wildcard(), "no local path for wildcard AS"); - Self::empty(ByEndpoint::with_cloned(isd_asn)) + + Self { + dataplane_path: DataplanePath::EmptyPath, + underlay_next_hop: None, + isd_asn: ByEndpoint::with_cloned(isd_asn), + metadata: Some(PathMetadata { + expiration: DateTime::::MAX_UTC, + mtu: PATH_MIN_MTU, + interfaces: vec![], + ..PathMetadata::default() + }), + } + } + + /// Returns the source of this path. + pub const fn source(&self) -> IsdAsn { + self.isd_asn.source + } + + /// Returns the destination of this path. + pub const fn destination(&self) -> IsdAsn { + self.isd_asn.destination } pub fn empty(isd_asn: ByEndpoint) -> Self { @@ -101,6 +125,20 @@ impl Path { pub fn expiry_time(&self) -> Option> { self.metadata.as_ref().map(|metadata| metadata.expiration) } + + /// Returns true if the path contains an expiry time, and it is after now, + /// false if the contained expiry time is at or before now, and None if the path + /// does not contain an expiry time. + pub fn is_expired(&self, now: DateTime) -> Option { + self.expiry_time().map(|t| t <= now) + } + + /// Returns the number of interfaces traversed by the path, if available. Otherwise None. + pub fn interface_count(&self) -> Option { + self.metadata + .as_ref() + .map(|metadata| metadata.interfaces.len()) + } } #[allow(missing_docs)] @@ -112,8 +150,10 @@ impl Path { ) -> Result { let mut dataplane_path = Bytes::from(std::mem::take(&mut value.raw)); if dataplane_path.is_empty() { - return if isd_asn.are_equal() { + return if isd_asn.are_equal() && isd_asn.destination.is_wildcard() { Ok(Path::empty(isd_asn)) + } else if isd_asn.are_equal() { + Ok(Path::local(isd_asn.destination)) } else { Err(PathParseErrorKind::EmptyRaw.into()) }; diff --git a/crates/scion/src/daemon/client.rs b/crates/scion/src/daemon/client.rs index 9237ce7..b671184 100644 --- a/crates/scion/src/daemon/client.rs +++ b/crates/scion/src/daemon/client.rs @@ -1,6 +1,6 @@ //! A client to communicate with the SCION daemon. -use std::env; +use std::{env, vec}; use scion_grpc::daemon::{v1 as daemon_grpc, v1::daemon_service_client::DaemonServiceClient}; use scion_proto::{address::IsdAsn, packet::ByEndpoint, path::Path}; @@ -11,6 +11,13 @@ use super::{ messages::{self, PathRequest}, AsInfo, }; +use crate::pan::{AsyncPathService, PathLookupError}; + +/// The default address of the SCION daemon. +pub const DEFAULT_DAEMON_ADDRESS: &str = "https://localhost:30255"; + +/// The environment variable to configure the address of the SCION daemon. +pub const DAEMON_ADDRESS_ENV_VARIABLE: &str = "SCION_DAEMON_ADDRESS"; #[allow(missing_docs)] #[derive(Debug, thiserror::Error)] @@ -23,12 +30,6 @@ pub enum DaemonClientError { InvalidData, } -/// The default address of the SCION daemon. -pub const DEFAULT_DAEMON_ADDRESS: &str = "https://localhost:30255"; - -/// The environment variable to configure the address of the SCION daemon. -pub const DAEMON_ADDRESS_ENV_VARIABLE: &str = "SCION_DAEMON_ADDRESS"; - /// Get the daemon address. /// /// Depending on the environment, this is the [`DEFAULT_DAEMON_ADDRESS`] or manually configured @@ -124,7 +125,7 @@ impl DaemonClient { #[derive(Debug)] pub struct Paths { isd_asn: ByEndpoint, - grpc_paths: std::vec::IntoIter, + grpc_paths: vec::IntoIter, } impl Iterator for Paths { @@ -140,3 +141,28 @@ impl Iterator for Paths { None } } + +impl AsyncPathService for DaemonClient { + type PathsTo = Paths; + + async fn paths_to(&self, scion_as: IsdAsn) -> Result { + self.check_destination(scion_as)?; + + Ok(self.paths(&PathRequest::new(scion_as)).await?) + } + + async fn path_to(&self, scion_as: IsdAsn) -> Result { + self.check_destination(scion_as)?; + + self.paths(&PathRequest::new(scion_as)) + .await? + .next() + .ok_or(PathLookupError::NoPath) + } +} + +impl From for PathLookupError { + fn from(value: DaemonClientError) -> Self { + PathLookupError::Other(Box::new(value)) + } +} diff --git a/crates/scion/src/pan.rs b/crates/scion/src/pan.rs index 7ec1120..cfcbc75 100644 --- a/crates/scion/src/pan.rs +++ b/crates/scion/src/pan.rs @@ -3,7 +3,7 @@ mod datagram; pub use datagram::{AsyncScionDatagram, PathAwareDatagram}; mod path_service; -pub use path_service::AsyncPathService; +pub use path_service::{AsyncPathService, PathLookupError}; mod error; pub use error::{PathErrorKind, ReceiveError, SendError}; diff --git a/crates/scion/src/pan/path_service.rs b/crates/scion/src/pan/path_service.rs index bce6503..85e24c9 100644 --- a/crates/scion/src/pan/path_service.rs +++ b/crates/scion/src/pan/path_service.rs @@ -16,6 +16,9 @@ pub enum PathLookupError { /// The destination can be queried, but there are no paths available to it. #[error("no path available to destination")] NoPath, + /// Other errors raised by the service. + #[error(transparent)] + Other(Box), } /// Trait for asynchronously retrieving paths to SCION ASes. diff --git a/crates/scion/src/pan/path_strategy.rs b/crates/scion/src/pan/path_strategy.rs index 0d0ac07..6e41970 100644 --- a/crates/scion/src/pan/path_strategy.rs +++ b/crates/scion/src/pan/path_strategy.rs @@ -6,8 +6,12 @@ use scion_proto::{address::IsdAsn, path::Path}; mod async_strategy; pub use async_strategy::AsyncPathStrategy; +pub mod refresher; + +mod utc_instant; + /// Errors returned when fetching paths from a [`PathStrategy`]. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, PartialEq, Eq)] pub enum PathFetchError { /// The requested destination is not supported by this strategy, /// and will never return a valid result. @@ -16,6 +20,7 @@ pub enum PathFetchError { } /// Requests that a path strategy can make on its controller. +#[derive(Debug, PartialEq, Eq)] pub enum Request { /// Requests that the controller queries paths to the specified destination. LookupPathsTo(IsdAsn), diff --git a/crates/scion/src/pan/path_strategy/async_strategy.rs b/crates/scion/src/pan/path_strategy/async_strategy.rs index 10e512f..dc01395 100644 --- a/crates/scion/src/pan/path_strategy/async_strategy.rs +++ b/crates/scion/src/pan/path_strategy/async_strategy.rs @@ -2,12 +2,11 @@ use std::{ collections::HashSet, future::Future, sync::{Arc, Mutex}, - time::Duration, }; use futures::stream::{FuturesUnordered, StreamExt}; use scion_proto::{address::IsdAsn, path::Path}; -use tokio::{task::JoinHandle, time, time::Instant}; +use tokio::{sync::watch, task::JoinHandle, time, time::Instant}; use super::PathStrategy; use crate::pan::{ @@ -52,8 +51,6 @@ where S: PathStrategy + Send + 'static, P: AsyncPathService + Send + Sync + 'static, { - const PATH_WAIT_TIME: Duration = Duration::from_millis(10); - /// Creates a new `AsyncPathStrategy` and spawns a background task to drive the strategy. /// /// The provided [`PathStrategy`] determines the logic and the provided [`AsyncPathService`] @@ -78,7 +75,7 @@ where F: FnOnce(&S, Instant) -> Result, { let start = Instant::now(); - tracing::debug!("waiting until paths become available"); + let mut update_listener = self.inner.subscribe_to_path_changes(); loop { { @@ -89,23 +86,21 @@ where match strategy.is_path_available(scion_as, now.into()) { Err(PathFetchError::UnsupportedDestination) => { tracing::debug!("request was for unsupported destination"); - return Err(PathLookupError::NoPath); + return Err(PathLookupError::UnsupportedDestination); } Ok(true) => { tracing::debug!(now=?rel_now, "paths are available, running handler"); return handler(&*strategy, now); } - Ok(false) => tracing::debug!( - now = tracing::field::debug(rel_now), - "no paths currently available" - ), + Ok(false) => tracing::debug!(now = ?rel_now, "no paths currently available"), } } - // Try again after a brief pause. - let rel_now = Instant::now().duration_since(start); - tracing::debug!(now=?rel_now, "waiting {:?} until next check", Self::PATH_WAIT_TIME); - tokio::time::sleep(Self::PATH_WAIT_TIME).await; + tracing::debug!("waiting until the path strategy has received paths"); + if update_listener.changed().await.is_err() { + tracing::warn!("channel dropped while waiting, aborting wait"); + return Err(PathLookupError::NoPath); + } } } } @@ -158,6 +153,7 @@ impl Drop for AsyncPathStrategy { struct AsyncPathStrategyInner { path_service: P, strategy: Mutex, + update_notifier: watch::Sender<()>, } impl<'p, S, P> AsyncPathStrategyInner @@ -169,6 +165,7 @@ where Self { strategy: Mutex::new(strategy), path_service, + update_notifier: watch::Sender::new(()), } } @@ -212,9 +209,8 @@ where callback_time=?callback_time.duration_since(start), "waiting for paths or callback timeout" ); - // Wait for the callback duration, or until a pending path query completes. - // If there are no pending path queries, then this will just sleep until the - // callback time. + // Wait for the callback duration, or until a pending path query completes. If there + // are no pending path queries, then this will just sleep until the callback time. while let Ok(next) = time::timeout_at(callback_time, requests.next_ready()).await { let span = tracing::debug_span!("event_wait", now=?start.elapsed()); @@ -234,6 +230,9 @@ where tracing::debug!(%scion_as, count=found_paths.len(), "path lookup successful"); strategy.handle_lookup_paths(&found_paths, Instant::now().into()); + + self.update_notifier.send_replace(()); + break; } None => { @@ -245,6 +244,10 @@ where } } } + + fn subscribe_to_path_changes(&self) -> watch::Receiver<()> { + self.update_notifier.subscribe() + } } /// Tracks the ASes for which path requests are pending. diff --git a/crates/scion/src/pan/path_strategy/refresher.rs b/crates/scion/src/pan/path_strategy/refresher.rs new file mode 100644 index 0000000..57c1a7d --- /dev/null +++ b/crates/scion/src/pan/path_strategy/refresher.rs @@ -0,0 +1,900 @@ +//! A path strategy that periodically refreshes cached paths. + +use std::{ + cmp::{self, Reverse}, + collections::HashMap, + slice, + time::{Duration, Instant}, +}; + +use chrono::{DateTime, Utc}; +use scion_proto::{ + address::IsdAsn, + path::{Path, PathFingerprint}, +}; + +use super::{utc_instant::UtcInstant, PathFetchError, PathStrategy, Request}; + +/// A [`PathStrategy`] that queries and refreshes paths. +/// +/// A `PathRefresher` is a path strategy that periodically requests path lookups to the configured +/// destination, caches them, and refreshes them as they approach their expiration. +/// +/// This strategy is agnostic to failures of its lookup requests. While it has at least one path nearing +/// expiration (within [`QUERY_LEAD_TIME`][Self::QUERY_LEAD_TIME] of its expiration time), it will +/// continuously create requests for path lookups every [`MIN_REFRESH_INTERVAL`][Self::MIN_REFRESH_INTERVAL]. +/// +/// The path strategy returns paths sorted first in ascending order of their number of interface hops, +/// followed by descending order of their expiration times. The minimum validity of returned paths can be +/// controlled with [`set_min_path_validity`][Self::set_min_path_validity]. +/// +/// This strategy requires that the stored paths have their associated +/// [`PathMetadata`][scion_proto::path::PathMetadata]. +pub struct PathRefresher { + remote_ia: IsdAsn, + paths: SortedPaths, + start: UtcInstant, + last_query: Option, + min_path_validity: Duration, +} + +impl PathRefresher { + /// The time before a path expires, at which new paths are queried (120 s). + pub const QUERY_LEAD_TIME: Duration = Duration::from_secs(120); + /// The minimum time between successive path requests (10 s). + pub const MIN_REFRESH_INTERVAL: Duration = Duration::from_secs(10); + /// The periodicity with which paths are refreshed (300 s). + pub const REFRESH_INTERVAL: Duration = Duration::from_secs(300); + /// The default period before a path's expiration time, at which it is considered expired (5 s). + pub const DEFAULT_MIN_PATH_VALIDITY: Duration = Duration::from_secs(5); + + /// Creates a new instance of `PathRefresher` for the SCION AS. + /// + /// # Panics + /// + /// This method panics if `remote_ia` is a wildcard [`IsdAsn`]. + pub fn new(remote_ia: IsdAsn) -> Self { + assert!( + !remote_ia.is_wildcard(), + "cannot create a PathRefresher for a wildcard AS" + ); + Self { + remote_ia, + start: UtcInstant::now(), + last_query: None, + paths: SortedPaths::default(), + min_path_validity: Self::DEFAULT_MIN_PATH_VALIDITY, + } + } + + /// The remote SCION AS for which this strategy caches paths. + pub const fn remote_ia(&self) -> IsdAsn { + self.remote_ia + } + + /// Caches paths to the remote SCION AS. + /// + /// Only the paths which have a destination of [`remote_ia()`][Self::remote_ia], have associated + /// metadata, and have not yet expired are cached. All other paths are discarded. + /// + /// Cached paths can be retrieved with [`get_path()`][Self::get_path] or [`paths()`][Self::paths]. + pub fn cache_paths(&mut self, paths: &[Path], now: DateTime) { + self.paths.extend_and_remove_expired( + paths.iter().filter(|p| p.destination() == self.remote_ia), + now, + ); + } + + /// Returns the path with the lowest hop count and longest remaining validity. + pub fn get_path(&self, now: DateTime) -> Option<&Path> { + self.paths.get_best_path(now, self.min_path_validity()) + } + + /// Returns an iterator over the cached, unexpired paths. + pub fn paths(&self, now: DateTime) -> PathsTo<'_> { + PathsTo::new(&self.paths, now, self.min_path_validity()) + } + + /// Gets the configured minimum path validity. + /// + /// See [`set_min_path_validity`][Self::set_min_path_validity] for more information. + pub const fn min_path_validity(&self) -> Duration { + self.min_path_validity + } + + /// Sets the minimum path validity. + /// + /// A minimum path validity of `t` ensures that all returned paths will be valid for a duration of + /// at least `t` from the time the path query is requested. If set to zero, a path may be returned up + /// until its expiry time. + /// + /// A value that is too low (milliseconds or zero) may result in paths being rejected by routers, + /// as they may expire while traversing the network or be considered expired due differences in time + /// synchronisation. A value that is too large underutilises the available paths. The default value + /// of [`DEFAULT_MIN_PATH_VALIDITY`][`Self::DEFAULT_MIN_PATH_VALIDITY`] therefore sets this to a few + /// seconds. + pub fn set_min_path_validity(&mut self, offset: Duration) { + self.min_path_validity = offset; + } + + #[inline] + fn check_destination(&self, destination: IsdAsn) -> Result<(), PathFetchError> { + if destination != self.remote_ia() { + Err(PathFetchError::UnsupportedDestination) + } else { + Ok(()) + } + } + + /// Duration until the next path lookup. + fn duration_until_next_lookup(&self, now: Instant) -> Duration { + let Some(last_query) = self.last_query else { + return Duration::ZERO; + }; + + let desired_query_in = if self.paths.is_empty() { + Duration::ZERO + } else { + let utc_now = self.start.instant_to_utc(now); + + // Time from now to the next planned query or zero + let periodic = (last_query + Self::REFRESH_INTERVAL).duration_since(now); + + // Time until the next path expires + let earliest_expiry = self + .paths + .earliest_expiry() + .expect("there are paths, therefore an earliest expiry time"); + + let duration_until_earliest_expiry = earliest_expiry + .signed_duration_since(utc_now) + .to_std() + .unwrap_or(Duration::ZERO); + + let duration_until_refresh = duration_until_earliest_expiry + .checked_sub(Self::QUERY_LEAD_TIME) + .unwrap_or(Duration::ZERO); + + cmp::min(duration_until_refresh, periodic) + }; + + let earliest_possible_in = Self::MIN_REFRESH_INTERVAL + .checked_sub(now.duration_since(last_query)) + .unwrap_or(Duration::ZERO); + + cmp::max(desired_query_in, earliest_possible_in) + } +} + +impl PathStrategy for PathRefresher { + type PathsTo<'p> = PathsTo<'p> + where + Self: 'p; + + fn paths_to( + &self, + destination: IsdAsn, + now: Instant, + ) -> Result, PathFetchError> { + self.check_destination(destination)?; + + Ok(self.paths(self.start.instant_to_utc(now))) + } + + fn path_to(&self, destination: IsdAsn, now: Instant) -> Result, PathFetchError> { + self.check_destination(destination)?; + + Ok(self.get_path(self.start.instant_to_utc(now))) + } + + fn is_path_available(&self, destination: IsdAsn, now: Instant) -> Result { + Ok(self.path_to(destination, now)?.is_some()) + } + + fn poll_requests(&mut self, now: Instant) -> Request { + let until_next_lookup = self.duration_until_next_lookup(now); + + if until_next_lookup.is_zero() { + self.last_query = Some(now); + Request::LookupPathsTo(self.remote_ia) + } else { + Request::Callback(until_next_lookup) + } + } + + fn handle_lookup_paths(&mut self, paths: &[Path], now: Instant) { + self.cache_paths(paths, self.start.instant_to_utc(now)); + } +} + +#[derive(Debug)] +struct PathInfo { + n_interfaces: usize, + expiry_time: DateTime, + fingerprint: PathFingerprint, +} + +impl PathInfo { + fn new(path: &Path) -> Option { + let metadata = path.metadata.as_ref()?; + + Some(Self { + n_interfaces: metadata.interfaces.len(), + expiry_time: metadata.expiration, + fingerprint: path.fingerprint().ok()?, + }) + } + + fn is_expired(&self, now: DateTime) -> bool { + self.expiry_time <= now + } +} + +#[derive(Debug, Default)] +struct SortedPaths { + paths: HashMap, + path_order: Vec, + earliest_expiry: Option>, +} + +impl SortedPaths { + fn is_empty(&self) -> bool { + self.paths.is_empty() + } + + fn earliest_expiry(&self) -> Option> { + self.earliest_expiry + } + + fn set_earliest_expiry_time(&mut self) { + self.earliest_expiry = self.path_order.iter().map(|p| p.expiry_time).min() + } + + fn create_ordering(&mut self) { + self.path_order.clear(); + self.path_order.extend( + self.paths + .values() + .map(|path| PathInfo::new(path).expect("all stored paths have metadata")), + ); + self.path_order + .sort_by_key(|info| (info.n_interfaces, Reverse(info.expiry_time))); + } + + /// Returns the first path satisfying `expiry_time > now + min_validity`. + fn get_best_path(&self, now: DateTime, min_validity: Duration) -> Option<&Path> { + self.path_order + .iter() + .find(|info| !info.is_expired(now + min_validity)) + .map(|info| &self.paths[&info.fingerprint]) + } + + fn extend_and_remove_expired<'a, T>(&mut self, iter: T, now: DateTime) + where + T: IntoIterator, + { + for path in iter { + // We do not store paths without an expiry or expired paths. + if path.is_expired(now).unwrap_or(true) { + tracing::debug!(?path, "discarding expired path"); + continue; + } + let expiry_time = path + .expiry_time() + .expect("paths without expiry time filtered above"); + + let Ok(fingerprint) = path.fingerprint() else { + // We cannot store paths that do not have a fingerprint. + tracing::debug!("discarding path without a fingerprint"); + continue; + }; + + self.paths + .entry(fingerprint) + .and_modify(|prior_path| { + let prior_expiry_time = prior_path + .expiry_time() + .expect("only paths with expiry times to have been stored"); + + if expiry_time > prior_expiry_time { + *prior_path = path.clone(); + } + }) + .or_insert_with(|| path.clone()); + } + + self.remove_expired(now); + self.create_ordering(); + self.set_earliest_expiry_time(); + } + + fn remove_expired(&mut self, now: DateTime) { + self.paths.retain(|_, path| { + !path + .is_expired(now) + .expect("only paths with expiry times to have been stored") + }) + } +} + +/// Iterator over SCION paths to a pre-specified destination. +/// +/// Created using [`PathStrategy::paths_to`] on a [`PathRefresher`]. +pub struct PathsTo<'a> { + now: DateTime, + inner: slice::Iter<'a, PathInfo>, + paths: &'a SortedPaths, +} + +impl<'a> PathsTo<'a> { + fn new(paths: &'a SortedPaths, now: DateTime, min_validity: Duration) -> Self { + Self { + paths, + now: now + min_validity, + inner: paths.path_order.iter(), + } + } +} + +impl<'a> Iterator for PathsTo<'a> { + type Item = &'a Path; + + fn next(&mut self) -> Option { + #[allow(clippy::while_let_on_iterator)] + while let Some(info) = self.inner.next() { + if !info.is_expired(self.now) { + return Some(&self.paths.paths[&info.fingerprint]); + } + } + None + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::atomic::{AtomicU64, Ordering}, + time::{Duration, Instant}, + }; + + use chrono::{DateTime, Utc}; + use scion_proto::{ + address::IsdAsn, + packet::ByEndpoint, + path::{Path, PathInterface, PathMetadata}, + }; + + use super::*; + use crate::pan::path_strategy::{utc_instant::UtcInstant, Request}; + + const LOCAL_IA: IsdAsn = IsdAsn(0x1_ff00_0000_0001); + const OTHER_IA: IsdAsn = IsdAsn(0x2_ff00_0000_0002); + + type TestResult = Result>; + + #[inline] + fn nanos(nanoseconds: u64) -> Duration { + Duration::from_nanos(nanoseconds) + } + + #[inline] + fn secs(seconds: u64) -> Duration { + Duration::from_secs(seconds) + } + + #[inline] + fn mins(minutes: u64) -> Duration { + Duration::from_secs(minutes * 60) + } + + /// Checks that two unordered lists of *test* paths are equal. + /// + /// To do this, this macro sorts the paths then compares them for equality. As [`Path`] does not + /// implement `Ord`, paths generated for the tests store a unique ID in the source AS. They are + /// then sorted based on this unique ID for comparison. + macro_rules! assert_paths_unordered_eq { + ($lhs:expr, $rhs:expr) => { + let mut lhs: Vec<_> = $lhs.into_iter().collect(); + let mut rhs: Vec<_> = $rhs.into_iter().collect(); + + lhs.sort_by_key(|p| p.source()); + rhs.sort_by_key(|p| p.source()); + + assert_eq!(lhs, rhs); + }; + } + + macro_rules! param_test { + ($func_name:ident -> $return_type:ty: [ + $( $case_name:ident: ($($arg:expr),*) ),* + ]) => { + mod $func_name { + use super::*; + + $( + #[test] + fn $case_name() -> $return_type { + $func_name($($arg,)*) + } + )* + } + }; + ($func_name:ident$( -> $result:ty)?: [ + $( $case_name:ident: $arg:expr ),* + ]) => { + param_test!($func_name$( -> $result)?: [ $($case_name: ($arg)),* ]); + }; + } + + fn get_strategy() -> PathRefresher { + PathRefresher::new(LOCAL_IA) + } + + fn get_strategy_with_reference_time() -> (PathRefresher, UtcInstant) { + let strategy = PathRefresher::new(LOCAL_IA); + let utc_instant = strategy.start; + + (strategy, utc_instant) + } + + /// Returns several paths to the requested destination, all with an expiry time + /// with at most the provided time. + /// + /// The paths have the expiry time of expiry_time, then 1 minute earlier for each additional path. + fn get_paths_with_expiry_time_before( + destination: IsdAsn, + count: usize, + expiry_time: DateTime, + ) -> Vec { + (0..count) + .map(|i| { + expiry_time + .checked_sub_signed(chrono::Duration::minutes(i as i64)) + .unwrap_or(DateTime::::MIN_UTC) + }) + .map(|expiration| make_test_path(destination, 1, expiration)) + .collect() + } + + fn get_unexpired_paths(destination: IsdAsn, count: usize) -> Vec { + (0..count) + .map(|_| make_test_path(destination, 1, DateTime::::MAX_UTC)) + .collect() + } + + fn get_paths_with_hops_and_expiry( + remote_ia: IsdAsn, + base_expiry_time: DateTime, + hops_and_expiry_offsets: &[(usize, &[Duration])], + ) -> Vec { + hops_and_expiry_offsets + .iter() + .flat_map(|(count, offsets)| { + offsets + .iter() + .map(|offset| make_test_path(remote_ia, *count, base_expiry_time + *offset)) + }) + .collect() + } + + fn make_test_path(destination: IsdAsn, hop_count: usize, expiration: DateTime) -> Path { + static COUNTER: AtomicU64 = AtomicU64::new(0x99_ff00_0000_f999); + + let source = IsdAsn(COUNTER.fetch_add(1, Ordering::Relaxed)); + let metadata = PathMetadata { + expiration, + interfaces: vec![ + Some(PathInterface { + isd_asn: source, + id: 1, + }); + hop_count + ], + ..PathMetadata::default() + }; + Path { + metadata: Some(metadata), + ..Path::empty(ByEndpoint { + source, + destination, + }) + } + } + + fn get_multiple_paths_to( + strategy: &PathRefresher, + destination: IsdAsn, + now: Instant, + ) -> Result, PathFetchError> { + strategy + .paths_to(destination, now) + .map(|iter| iter.cloned().collect()) + } + + fn get_single_path_to( + strategy: &PathRefresher, + destination: IsdAsn, + now: Instant, + ) -> Result, PathFetchError> { + strategy + .path_to(destination, now) + .map(|maybe_path| maybe_path.into_iter().cloned().collect()) + } + + type GetPathsFn = fn(&PathRefresher, IsdAsn, Instant) -> Result, PathFetchError>; + + fn stores_and_returns_paths(n_paths: usize, get_paths: GetPathsFn) -> TestResult { + let mut strategy = get_strategy(); + let paths = get_unexpired_paths(strategy.remote_ia(), n_paths); + + strategy.handle_lookup_paths(&paths, Instant::now()); + let returned_paths = get_paths(&strategy, strategy.remote_ia(), Instant::now())?; + + assert_paths_unordered_eq!(returned_paths, paths); + + Ok(()) + } + + param_test! { + stores_and_returns_paths -> TestResult: [ + path_to: (1, get_single_path_to), + paths_to_single: (1, get_multiple_paths_to), + paths_to_multiple: (3, get_multiple_paths_to) + ] + } + + fn paths_to_unsupported_ia_errs(unsupported_ia: &str, get_paths: GetPathsFn) { + let strategy = get_strategy(); + + let unsupported_ia: IsdAsn = unsupported_ia.parse().expect("valid ISD-ASN"); + assert_eq!( + get_paths(&strategy, unsupported_ia, Instant::now()), + Err(PathFetchError::UnsupportedDestination) + ); + } + + param_test! { + paths_to_unsupported_ia_errs -> (): [ + single_path_to_wildcard_isd: ("0-ff00:0:110", get_single_path_to), + single_path_to_wildcard_asn: ("1-0", get_single_path_to), + single_path_to_wildcard_ia: ("0-0", get_single_path_to), + single_path_to_other_ia: (&OTHER_IA.to_string(), get_single_path_to), + multiple_paths_to_wildcard_isd: ("0-ff00:0:110", get_multiple_paths_to), + multiple_paths_to_wildcard_asn: ("1-0", get_multiple_paths_to), + multiple_paths_to_wildcard_ia: ("0-0", get_multiple_paths_to), + multiple_paths_to_other_ia: (&OTHER_IA.to_string(), get_multiple_paths_to) + ] + } + + fn is_path_available_errs_with_unsupported_ia(unsupported_ia: &str) { + let unsupported_ia: IsdAsn = unsupported_ia.parse().expect("valid ISD-ASN"); + + let strategy = get_strategy(); + assert_eq!( + strategy.is_path_available(unsupported_ia, Instant::now()), + Err(PathFetchError::UnsupportedDestination), + ); + } + + param_test! { + is_path_available_errs_with_unsupported_ia -> (): [ + wildcard_isd: "0-ff00:0:110", + wildcard_asn: "1-0", + wildcard_ia: "0-0", + other_ia: &OTHER_IA.to_string() + ] + } + + fn does_not_return_paths_to_other_scion_ases(get_paths: GetPathsFn) { + let mut strategy = get_strategy(); + let paths = get_unexpired_paths(OTHER_IA, 3); + + strategy.handle_lookup_paths(&paths, Instant::now()); + + let returned_paths = get_paths(&strategy, LOCAL_IA, Instant::now()) + .expect("should not err for supported IA"); + + assert!(returned_paths.is_empty()); + } + + param_test! { + does_not_return_paths_to_other_scion_ases -> (): [ + path_to: get_single_path_to, + paths_to: get_multiple_paths_to + ] + } + + fn does_not_return_paths_that_expired_before_storage(get_paths: GetPathsFn) { + let (mut strategy, start) = get_strategy_with_reference_time(); + let expiry_instant = start.instant() + secs(900); + let earliest_expiry_time = start.instant_to_utc(expiry_instant); + let paths = + get_paths_with_expiry_time_before(strategy.remote_ia(), 3, earliest_expiry_time); + + strategy.handle_lookup_paths(&paths, expiry_instant); + + let returned_paths = get_paths(&strategy, LOCAL_IA, expiry_instant).unwrap(); + + assert!(returned_paths.is_empty()); + } + + param_test! { + does_not_return_paths_that_expired_before_storage -> (): [ + path_to: get_single_path_to, + paths_to: get_multiple_paths_to + ] + } + + fn does_not_return_paths_that_expired_since_storage(get_paths: GetPathsFn) { + let (mut strategy, start) = get_strategy_with_reference_time(); + + let expiry_instant = start.instant() + secs(3600); + let ten_mins_before_expiry = expiry_instant - secs(60 * 10); + + let earliest_expiry_time = start.instant_to_utc(expiry_instant); + let paths = + get_paths_with_expiry_time_before(strategy.remote_ia(), 3, earliest_expiry_time); + + strategy.handle_lookup_paths(&paths, ten_mins_before_expiry); + + let returned_paths = get_paths(&strategy, LOCAL_IA, ten_mins_before_expiry).unwrap(); + assert!(!returned_paths.is_empty()); + + let returned_paths = get_paths(&strategy, LOCAL_IA, expiry_instant).unwrap(); + assert!(returned_paths.is_empty()); + } + + param_test! { + does_not_return_paths_that_expired_since_storage -> (): [ + path_to: get_single_path_to, + paths_to: get_multiple_paths_to + ] + } + + fn panics_if_created_with_wildcard(remote_ia: &str) { + let remote_ia: IsdAsn = remote_ia.parse().expect("valid IsdAsn"); + assert!(remote_ia.is_wildcard()); + + let result = std::panic::catch_unwind(|| { + PathRefresher::new(remote_ia); + }); + assert!(result.is_err()); + } + + param_test! { + panics_if_created_with_wildcard -> (): [ + wildcard_isd: "0-ff00:0:110", + wildcard_asn: "1-0", + wildcard_ia: "0-0" + ] + } + + #[test] + fn requests_lookups_if_a_path_is_expired() { + let (mut strategy, start) = get_strategy_with_reference_time(); + + let expiry_instant = start.instant() + secs(3600); + let expiry_time = start.instant_to_utc(expiry_instant); + + let mut paths = get_paths_with_expiry_time_before(strategy.remote_ia(), 1, expiry_time); + paths.extend_from_slice(&get_unexpired_paths(strategy.remote_ia(), 2)); + + strategy.handle_lookup_paths(&paths, start.instant()); + + assert_eq!( + strategy.poll_requests(expiry_instant), + Request::LookupPathsTo(strategy.remote_ia()) + ); + } + + #[test] + fn requests_lookups_if_paths_are_close_to_expiring() { + let (mut strategy, start) = get_strategy_with_reference_time(); + // Get rid of the initial poll for paths + strategy.poll_requests(start.instant()); + + let expiry_instant = start.instant() + secs(180); + let expected_refresh_instant = expiry_instant - PathRefresher::QUERY_LEAD_TIME; + + let expiry_time = start.instant_to_utc(expiry_instant); + let mut paths = get_paths_with_expiry_time_before(strategy.remote_ia(), 1, expiry_time); + paths.extend_from_slice(&get_unexpired_paths(strategy.remote_ia(), 2)); + + strategy.handle_lookup_paths(&paths, start.instant()); + + assert_ne!( + strategy.poll_requests(expected_refresh_instant - nanos(1)), + Request::LookupPathsTo(strategy.remote_ia()), + "should not yet request lookup", + ); + assert_eq!( + strategy.poll_requests(expected_refresh_instant), + Request::LookupPathsTo(strategy.remote_ia()), + "should request lookup at expected refresh instant", + ); + } + + #[test] + fn rerequests_lookups_only_after_a_delay() { + let (mut strategy, start) = get_strategy_with_reference_time(); + // Get rid of the initial poll for paths + strategy.poll_requests(start.instant()); + + let expiry_instant = start.instant() + secs(3600); + let expected_refresh_instant = expiry_instant - PathRefresher::QUERY_LEAD_TIME; + let expected_next_refresh_instant = + expected_refresh_instant + PathRefresher::MIN_REFRESH_INTERVAL; + + let expiry_time = start.instant_to_utc(expiry_instant); + let mut paths = get_paths_with_expiry_time_before(strategy.remote_ia(), 1, expiry_time); + paths.extend_from_slice(&get_unexpired_paths(strategy.remote_ia(), 2)); + + strategy.handle_lookup_paths(&paths, start.instant()); + + assert_eq!( + strategy.poll_requests(expected_refresh_instant), + Request::LookupPathsTo(strategy.remote_ia()), + "should refresh at first expected interval" + ); + assert_ne!( + strategy.poll_requests(expected_next_refresh_instant - nanos(1)), + Request::LookupPathsTo(strategy.remote_ia()), + "should pause refreshing for MIN_REFRESH_INTERVAL" + ); + assert_eq!( + strategy.poll_requests(expected_next_refresh_instant), + Request::LookupPathsTo(strategy.remote_ia()), + "should resume refreshing after MIN_REFRESH_INTERVAL" + ); + } + + #[test] + fn periodically_refreshes_paths() { + let (mut strategy, start) = get_strategy_with_reference_time(); + // Get rid of the initial poll for paths + strategy.poll_requests(start.instant()); + + let paths = get_unexpired_paths(strategy.remote_ia(), 3); + strategy.handle_lookup_paths(&paths, start.instant()); + + let expected_refresh_instant = start.instant() + PathRefresher::REFRESH_INTERVAL; + + assert_ne!( + strategy.poll_requests(expected_refresh_instant - nanos(1)), + Request::LookupPathsTo(strategy.remote_ia()), + "should not refresh valid paths before REFRESH_INTERVAL" + ); + assert_eq!( + strategy.poll_requests(expected_refresh_instant), + Request::LookupPathsTo(strategy.remote_ia()), + "should refresh after REFRESH_INTERVAL" + ); + } + + fn returns_lowest_hops_least_expired_paths(get_paths: GetPathsFn, truncate_to_first: bool) { + let (mut strategy, start) = get_strategy_with_reference_time(); + let paths = get_paths_with_hops_and_expiry( + strategy.remote_ia(), + start.time(), + &[(5, &[mins(60), mins(180)]), (2, &[mins(120), mins(30)])], + ); + + strategy.handle_lookup_paths(&paths, start.instant()); + + let mut expected_paths: Vec = [&paths[2], &paths[3], &paths[1], &paths[0]] + .into_iter() + .cloned() + .collect(); + if truncate_to_first { + expected_paths.truncate(1); + } + + let returned_paths = get_paths(&strategy, strategy.remote_ia(), start.instant()) + .expect("should return paths"); + + assert_eq!(returned_paths, expected_paths); + } + + param_test! { + returns_lowest_hops_least_expired_paths -> (): [ + path_to: (get_single_path_to, true), + paths_to: (get_multiple_paths_to, false) + ] + } + + fn returned_paths_respect_min_validity(get_paths: GetPathsFn, min_validity: Duration) { + let (mut strategy, start) = get_strategy_with_reference_time(); + let expiry_instant = start.instant() + mins(60); + let min_validity_instant = expiry_instant - min_validity; + let path = make_test_path( + strategy.remote_ia(), + 3, + start.instant_to_utc(expiry_instant), + ); + strategy.handle_lookup_paths(&[path.clone()], start.instant()); + + strategy.set_min_path_validity(min_validity); + let paths = get_paths(&strategy, strategy.remote_ia(), min_validity_instant).unwrap(); + assert!( + paths.is_empty(), + "should not return paths at min validity instant", + ); + + let paths = get_paths( + &strategy, + strategy.remote_ia(), + min_validity_instant - nanos(1), + ) + .unwrap(); + assert!( + !paths.is_empty(), + "should return paths before min validity instant", + ); + } + + param_test! { + returned_paths_respect_min_validity -> (): [ + single_path_zero_validity: (get_single_path_to, Duration::ZERO), + single_path_non_zero_validity: (get_single_path_to, mins(5)), + multiple_paths_zero_validity: (get_multiple_paths_to, Duration::ZERO), + multiple_paths_non_zero_validity: (get_multiple_paths_to, mins(5)) + ] + } + + #[test] + fn paths_with_newer_expiry_time_replaces_old() { + let (mut strategy, start) = get_strategy_with_reference_time(); + + let mut paths = get_unexpired_paths(strategy.remote_ia(), 2); + let initial_expiry_time = start.time() + mins(5); + paths.push(make_test_path(strategy.remote_ia(), 3, initial_expiry_time)); + + strategy.handle_lookup_paths(&paths, start.instant()); + + // Refresh the path by 60 mins + paths[2].metadata.as_mut().unwrap().expiration += mins(60); + + strategy.handle_lookup_paths(&paths[2..], start.instant()); + + let returned_paths = + get_multiple_paths_to(&strategy, strategy.remote_ia(), start.instant()).unwrap(); + + assert_paths_unordered_eq!(returned_paths, paths); + } + + #[test] + fn paths_with_older_expiry_time_are_ignored() { + let (mut strategy, start) = get_strategy_with_reference_time(); + + let mut paths = get_unexpired_paths(strategy.remote_ia(), 2); + let initial_expiry_time = start.time() + mins(5); + paths.push(make_test_path(strategy.remote_ia(), 3, initial_expiry_time)); + + strategy.handle_lookup_paths(&paths, start.instant()); + + // Attempt to refresh with a path with a 1 min earlier expiry time + let mut older_path = paths[2].clone(); + older_path.metadata.as_mut().unwrap().expiration -= mins(1); + + strategy.handle_lookup_paths(&[older_path], start.instant()); + + let returned_paths = + get_multiple_paths_to(&strategy, strategy.remote_ia(), start.instant()).unwrap(); + + assert_paths_unordered_eq!(returned_paths, paths); + } + + #[test] + fn looksup_paths_when_empty() { + let (mut strategy, start) = get_strategy_with_reference_time(); + + assert_eq!( + strategy.poll_requests(start.instant()), + Request::LookupPathsTo(strategy.remote_ia()) + ); + assert_eq!( + strategy.poll_requests(start.instant() + PathRefresher::MIN_REFRESH_INTERVAL + secs(1)), + Request::LookupPathsTo(strategy.remote_ia()) + ); + } +} diff --git a/crates/scion/src/pan/path_strategy/utc_instant.rs b/crates/scion/src/pan/path_strategy/utc_instant.rs new file mode 100644 index 0000000..ecc195f --- /dev/null +++ b/crates/scion/src/pan/path_strategy/utc_instant.rs @@ -0,0 +1,76 @@ +use std::time::Instant; + +use chrono::{DateTime, Duration as ChronoDuration, Utc}; + +/// Utility to map between [`Instant`] and [`DateTime`]. +/// +/// All conversions occur relative to a reference start time, which is set when and +/// instant is created with [`UtcReferenceInstant::now()`]. +#[derive(Debug, Clone, Copy)] +pub(crate) struct UtcInstant { + time: DateTime, + instant: Instant, +} + +impl UtcInstant { + /// Creates a new reference instant corresponding to now. + pub fn now() -> Self { + Self { + time: Utc::now(), + instant: Instant::now(), + } + } + + /// Returns the [`DateTime`] associated with this `UtcInstant`. + #[cfg(test)] + pub const fn time(&self) -> DateTime { + self.time + } + + /// Returns the [`Instant`] associated with this `UtcInstant`. + #[cfg(test)] + pub const fn instant(&self) -> Instant { + self.instant + } + + /// Converts the provided [`Instant`] to a [`DateTime`] relative to this `UtcInstant`. + /// + /// The result returned by this method saturates at `self.time()` and [`DateTime::::MAX_UTC`]. + /// Therefore, passing an instant less than `self.instant()` will return `self.time()`. + pub fn instant_to_utc(&self, instant: Instant) -> DateTime { + let duration_since_reference = instant.duration_since(self.instant); + + // Convert to a chrono::Duration. This conversion would fail if the the StdDuration is larger than + // the max chrono::Duration, but in that case, the value would anyway result in MAX_UTC. + ChronoDuration::from_std(duration_since_reference) + .ok() + .and_then(|duration| self.time.checked_add_signed(duration)) + .unwrap_or(DateTime::::MAX_UTC) + } +} + +impl From for DateTime { + fn from(value: UtcInstant) -> Self { + value.time + } +} + +impl From for Instant { + fn from(value: UtcInstant) -> Self { + value.instant + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn addition_with_max_chrono_duration_saturates_datetime() { + let datetime = DateTime::::MIN_UTC; + assert_eq!( + datetime.checked_add_signed(ChronoDuration::max_value()), + None + ); + } +} diff --git a/crates/scion/tests/test_path_strategy.rs b/crates/scion/tests/test_path_strategy.rs new file mode 100644 index 0000000..9dfd912 --- /dev/null +++ b/crates/scion/tests/test_path_strategy.rs @@ -0,0 +1,77 @@ +use std::{sync::Arc, time::Duration}; + +use bytes::Bytes; +use scion::{ + daemon::{self, DaemonClient}, + pan::{ + path_strategy::{refresher::PathRefresher, AsyncPathStrategy}, + AsyncScionDatagram, + PathAwareDatagram, + }, + udp_socket::UdpSocket, +}; +use scion_proto::address::{IsdAsn, SocketAddr}; + +type TestResult = Result>; +type PathService = AsyncPathStrategy; + +static MESSAGE: Bytes = Bytes::from_static(b"Hello SCION!"); +const TIMEOUT: Duration = std::time::Duration::from_secs(1); + +async fn get_path_strategy(destination: IsdAsn) -> PathService { + let daemon_client = DaemonClient::connect(&daemon::get_daemon_address()) + .await + .expect("should be able to connect"); + let strategy = PathRefresher::new(destination); + + AsyncPathStrategy::new(strategy, daemon_client) +} + +async fn get_source_socket( + source: SocketAddr, + destination: IsdAsn, +) -> TestResult> { + let strategy = get_path_strategy(destination).await; + let socket = UdpSocket::bind(source).await?; + + Ok(PathAwareDatagram::new(socket, Arc::new(strategy))) +} + +async fn test_sending_message_to_destination(src_address: &str, dst_address: &str) -> TestResult { + let src_address: SocketAddr = src_address.parse()?; + let dst_address: SocketAddr = dst_address.parse()?; + + let source = get_source_socket(src_address, dst_address.isd_asn()).await?; + let destination = UdpSocket::bind(dst_address).await?; + + tokio::time::timeout(TIMEOUT, source.send_to(MESSAGE.clone(), dst_address)).await??; + + let mut buffer = vec![0_u8; 1500]; + let (length, sender) = + tokio::time::timeout(TIMEOUT, destination.recv_from(&mut buffer)).await??; + + assert_eq!(sender, source.as_ref().local_addr()); + assert_eq!(buffer[..length], MESSAGE[..]); + + Ok(()) +} + +#[tokio::test] +#[ignore = "requires daemon and dispatcher"] +async fn sends_along_up_down_segment() -> TestResult { + test_sending_message_to_destination( + "[1-ff00:0:111,127.0.0.17]:12345", + "[1-ff00:0:112,fd00:f00d:cafe::7f00:a]:443", + ) + .await +} + +#[tokio::test] +#[ignore = "requires daemon and dispatcher"] +async fn sends_same_as() -> TestResult { + test_sending_message_to_destination( + "[1-ff00:0:111,127.0.0.17]:12346", + "[1-ff00:0:111,127.0.0.17]:8080", + ) + .await +}