diff --git a/crates/scion/src/pan/path_strategy.rs b/crates/scion/src/pan/path_strategy.rs index 6e41970..ffc4a86 100644 --- a/crates/scion/src/pan/path_strategy.rs +++ b/crates/scion/src/pan/path_strategy.rs @@ -6,10 +6,14 @@ use scion_proto::{address::IsdAsn, path::Path}; mod async_strategy; pub use async_strategy::AsyncPathStrategy; -pub mod refresher; - mod utc_instant; +#[cfg(test)] +mod test_utils; + +pub mod refresher; +pub mod uniform; + /// Errors returned when fetching paths from a [`PathStrategy`]. #[derive(Debug, thiserror::Error, PartialEq, Eq)] pub enum PathFetchError { @@ -17,6 +21,11 @@ pub enum PathFetchError { /// and will never return a valid result. #[error("the requested destination is not supported by this strategy")] UnsupportedDestination, + /// This error is raised when a call to a path fetch changes the internal state + /// of the strategy, in a way that would require the strategy to be polled despite + /// any previous pending callbacks. + #[error("the strategy wants to be polled irrespective of its previous callback")] + RequiresPoll, } /// Requests that a path strategy can make on its controller. diff --git a/crates/scion/src/pan/path_strategy/async_strategy.rs b/crates/scion/src/pan/path_strategy/async_strategy.rs index dc01395..082108b 100644 --- a/crates/scion/src/pan/path_strategy/async_strategy.rs +++ b/crates/scion/src/pan/path_strategy/async_strategy.rs @@ -6,7 +6,12 @@ use std::{ use futures::stream::{FuturesUnordered, StreamExt}; use scion_proto::{address::IsdAsn, path::Path}; -use tokio::{sync::watch, task::JoinHandle, time, time::Instant}; +use tokio::{ + sync::{watch, Notify}, + task::JoinHandle, + time, + time::Instant, +}; use super::PathStrategy; use crate::pan::{ @@ -88,6 +93,9 @@ where tracing::debug!("request was for unsupported destination"); return Err(PathLookupError::UnsupportedDestination); } + Err(PathFetchError::RequiresPoll) => { + self.inner.wake(); + } Ok(true) => { tracing::debug!(now=?rel_now, "paths are available, running handler"); return handler(&*strategy, now); @@ -154,6 +162,7 @@ struct AsyncPathStrategyInner { path_service: P, strategy: Mutex, update_notifier: watch::Sender<()>, + waker: Notify, } impl<'p, S, P> AsyncPathStrategyInner @@ -166,6 +175,7 @@ where strategy: Mutex::new(strategy), path_service, update_notifier: watch::Sender::new(()), + waker: Notify::new(), } } @@ -211,35 +221,47 @@ where // Wait for the callback duration, or until a pending path query completes. If there // are no pending path queries, then this will just sleep until the callback time. - while let Ok(next) = time::timeout_at(callback_time, requests.next_ready()).await { - let span = tracing::debug_span!("event_wait", now=?start.elapsed()); - - match next { - Some((scion_as, Err(err))) => { - span.in_scope( - || tracing::warn!(%scion_as, %err, "ignoring path lookup failure"), - ); - continue; - } - Some((scion_as, Ok(paths))) => { - let _guard = span.enter(); - let mut strategy = self.strategy.lock().unwrap(); - - found_paths.clear(); - found_paths.extend(paths); - - tracing::debug!(%scion_as, count=found_paths.len(), "path lookup successful"); - strategy.handle_lookup_paths(&found_paths, Instant::now().into()); - - self.update_notifier.send_replace(()); - + loop { + tokio::select! { + _ = time::sleep_until(callback_time) => { + tracing::debug!(now=?start.elapsed(), "callback duration elapsed"); break; } - None => { - span.in_scope(|| tracing::debug!("no pending path lookups remaining")); - time::sleep_until(callback_time).await; + _ = self.waker.notified() => { + tracing::debug!(now=?start.elapsed(), "woken prematurely by call to wake()"); break; } + next = requests.next_ready() => { + let span = tracing::debug_span!("event_wait", now=?start.elapsed()); + + match next { + Some((scion_as, Err(err))) => { + span.in_scope( + || tracing::warn!(%scion_as, %err, "ignoring path lookup failure"), + ); + continue; + } + Some((scion_as, Ok(paths))) => { + let _guard = span.enter(); + let mut strategy = self.strategy.lock().unwrap(); + + found_paths.clear(); + found_paths.extend(paths); + + tracing::debug!(%scion_as, count=found_paths.len(), "path lookup successful"); + strategy.handle_lookup_paths(&found_paths, Instant::now().into()); + + self.update_notifier.send_replace(()); + + break; + } + None => { + span.in_scope(|| tracing::debug!("no pending path lookups remaining")); + time::sleep_until(callback_time).await; + break; + } + } + } } } } @@ -248,6 +270,10 @@ where fn subscribe_to_path_changes(&self) -> watch::Receiver<()> { self.update_notifier.subscribe() } + + fn wake(&self) { + self.waker.notify_one() + } } /// Tracks the ASes for which path requests are pending. @@ -577,13 +603,26 @@ mod tests { use mocks::*; - async fn run_path_strategy(strategy: S, path_service: P, run_duration: Duration) - where + async fn create_and_run_path_strategy( + strategy: S, + path_service: P, + run_duration: Duration, + ) where S: PathStrategy + Send + 'static, P: AsyncPathService + Send + Sync + 'static, { - let mut async_strategy = AsyncPathStrategy::new(strategy, path_service); + let async_strategy = AsyncPathStrategy::new(strategy, path_service); + run_path_strategy(async_strategy, run_duration).await + } + + async fn run_path_strategy( + mut async_strategy: AsyncPathStrategy, + run_duration: Duration, + ) where + S: PathStrategy + Send + 'static, + P: AsyncPathService + Send + Sync + 'static, + { // Get the background task so that we can join on it let mut background_task = tokio::spawn(async {}); std::mem::swap(&mut background_task, &mut async_strategy.background_task); @@ -624,7 +663,7 @@ mod tests { .returning(never_completes) .times(1..); - run_path_strategy(strategy, path_service, millisecs(10)).await; + create_and_run_path_strategy(strategy, path_service, millisecs(10)).await; } #[tokio::test(start_paused = true)] @@ -638,7 +677,7 @@ mod tests { path_service.expect_paths_to().never(); - run_path_strategy(strategy, path_service, millisecs(10)).await; + create_and_run_path_strategy(strategy, path_service, millisecs(10)).await; } async fn polls_until_callback(n_lookups: usize) { @@ -650,7 +689,7 @@ mod tests { .returning(identical_lookups_then_callback(n_lookups, millisecs(100))) .times(n_lookups + 1); - run_path_strategy(strategy, path_service, millisecs(10)).await; + create_and_run_path_strategy(strategy, path_service, millisecs(10)).await; } async_param_test! { @@ -676,7 +715,7 @@ mod tests { .returning(never_completes) .times(1); - run_path_strategy(strategy, path_service, millisecs(10)).await; + create_and_run_path_strategy(strategy, path_service, millisecs(10)).await; } #[tokio::test(start_paused = true)] @@ -694,7 +733,7 @@ mod tests { .returning(never_completes) .times(2); - run_path_strategy(strategy, path_service, millisecs(10)).await; + create_and_run_path_strategy(strategy, path_service, millisecs(10)).await; } #[tokio::test(start_paused = true)] @@ -716,7 +755,7 @@ mod tests { .return_const(()) .times(expected_successful_lookups as usize); - run_path_strategy(strategy, path_service, millisecs(run_duration_ms)).await; + create_and_run_path_strategy(strategy, path_service, millisecs(run_duration_ms)).await; } #[tokio::test(start_paused = true)] @@ -738,7 +777,7 @@ mod tests { .withf(any_path_to(IsdAsn(REMOTE_IA.as_u64() + 1))) .times(1); - run_path_strategy(strategy, path_service, millisecs(10)).await; + create_and_run_path_strategy(strategy, path_service, millisecs(10)).await; } #[tokio::test(start_paused = true)] @@ -755,7 +794,7 @@ mod tests { strategy.expect_handle_lookup_paths().return_const(()); - run_path_strategy(strategy, path_service, millisecs(10)).await; + create_and_run_path_strategy(strategy, path_service, millisecs(10)).await; } #[tokio::test(start_paused = true)] @@ -778,7 +817,7 @@ mod tests { }) .times(1); - run_path_strategy(strategy, path_service, millisecs(35)).await; + create_and_run_path_strategy(strategy, path_service, millisecs(35)).await; } #[tokio::test] @@ -851,4 +890,27 @@ mod tests { Ok(()) } + + #[tokio::test(start_paused = true)] + async fn polls_when_requested_by_path_lookup() { + tracing_subscriber::fmt::init(); + + let mut strategy = MockStrategy::new(); + let path_service = MockPathService::that_never_completes(); + + strategy + .expect_is_path_available() + .returning(|_, _| Err(PathFetchError::RequiresPoll)); + strategy + .expect_poll_requests() + .returning(repeated_callbacks_with_duration(Duration::from_secs(3600))) + .times(2); + + let async_strategy = AsyncPathStrategy::new(strategy, path_service); + + let _ = tokio::time::timeout(millisecs(1), async_strategy.path_to(REMOTE_IA)).await; + tokio::time::advance(millisecs(2)).await; + + run_path_strategy(async_strategy, millisecs(10)).await; + } } diff --git a/crates/scion/src/pan/path_strategy/refresher.rs b/crates/scion/src/pan/path_strategy/refresher.rs index 57c1a7d..59b43db 100644 --- a/crates/scion/src/pan/path_strategy/refresher.rs +++ b/crates/scion/src/pan/path_strategy/refresher.rs @@ -320,16 +320,17 @@ impl SortedPaths { /// Iterator over SCION paths to a pre-specified destination. /// /// Created using [`PathStrategy::paths_to`] on a [`PathRefresher`]. +#[derive(Default)] pub struct PathsTo<'a> { now: DateTime, inner: slice::Iter<'a, PathInfo>, - paths: &'a SortedPaths, + paths: Option<&'a SortedPaths>, } impl<'a> PathsTo<'a> { fn new(paths: &'a SortedPaths, now: DateTime, min_validity: Duration) -> Self { Self { - paths, + paths: Some(paths), now: now + min_validity, inner: paths.path_order.iter(), } @@ -340,10 +341,14 @@ impl<'a> Iterator for PathsTo<'a> { type Item = &'a Path; fn next(&mut self) -> Option { + let Some(paths) = self.paths else { + return None; + }; + #[allow(clippy::while_let_on_iterator)] while let Some(info) = self.inner.next() { if !info.is_expired(self.now) { - return Some(&self.paths.paths[&info.fingerprint]); + return Some(&paths.paths[&info.fingerprint]); } } None @@ -352,25 +357,29 @@ impl<'a> Iterator for PathsTo<'a> { #[cfg(test)] mod tests { - use std::{ - sync::atomic::{AtomicU64, Ordering}, - time::{Duration, Instant}, - }; + use std::time::{Duration, Instant}; - use chrono::{DateTime, Utc}; - use scion_proto::{ - address::IsdAsn, - packet::ByEndpoint, - path::{Path, PathInterface, PathMetadata}, - }; + use scion_proto::{address::IsdAsn, path::Path}; use super::*; - use crate::pan::path_strategy::{utc_instant::UtcInstant, Request}; + use crate::pan::path_strategy::{ + test_utils::{ + assert_paths_unordered_eq, + get_paths_with_expiry_time_before, + get_paths_with_hops_and_expiry, + get_unexpired_paths, + make_test_path, + param_test, + }, + utc_instant::UtcInstant, + Request, + }; - const LOCAL_IA: IsdAsn = IsdAsn(0x1_ff00_0000_0001); + const REMOTE_IA: IsdAsn = IsdAsn(0x1_ff00_0000_0001); const OTHER_IA: IsdAsn = IsdAsn(0x2_ff00_0000_0002); type TestResult = Result>; + type GetPathsFn = fn(&PathRefresher, IsdAsn, Instant) -> Result, PathFetchError>; #[inline] fn nanos(nanoseconds: u64) -> Duration { @@ -387,120 +396,17 @@ mod tests { Duration::from_secs(minutes * 60) } - /// Checks that two unordered lists of *test* paths are equal. - /// - /// To do this, this macro sorts the paths then compares them for equality. As [`Path`] does not - /// implement `Ord`, paths generated for the tests store a unique ID in the source AS. They are - /// then sorted based on this unique ID for comparison. - macro_rules! assert_paths_unordered_eq { - ($lhs:expr, $rhs:expr) => { - let mut lhs: Vec<_> = $lhs.into_iter().collect(); - let mut rhs: Vec<_> = $rhs.into_iter().collect(); - - lhs.sort_by_key(|p| p.source()); - rhs.sort_by_key(|p| p.source()); - - assert_eq!(lhs, rhs); - }; - } - - macro_rules! param_test { - ($func_name:ident -> $return_type:ty: [ - $( $case_name:ident: ($($arg:expr),*) ),* - ]) => { - mod $func_name { - use super::*; - - $( - #[test] - fn $case_name() -> $return_type { - $func_name($($arg,)*) - } - )* - } - }; - ($func_name:ident$( -> $result:ty)?: [ - $( $case_name:ident: $arg:expr ),* - ]) => { - param_test!($func_name$( -> $result)?: [ $($case_name: ($arg)),* ]); - }; - } - fn get_strategy() -> PathRefresher { - PathRefresher::new(LOCAL_IA) + PathRefresher::new(REMOTE_IA) } fn get_strategy_with_reference_time() -> (PathRefresher, UtcInstant) { - let strategy = PathRefresher::new(LOCAL_IA); + let strategy = PathRefresher::new(REMOTE_IA); let utc_instant = strategy.start; (strategy, utc_instant) } - /// Returns several paths to the requested destination, all with an expiry time - /// with at most the provided time. - /// - /// The paths have the expiry time of expiry_time, then 1 minute earlier for each additional path. - fn get_paths_with_expiry_time_before( - destination: IsdAsn, - count: usize, - expiry_time: DateTime, - ) -> Vec { - (0..count) - .map(|i| { - expiry_time - .checked_sub_signed(chrono::Duration::minutes(i as i64)) - .unwrap_or(DateTime::::MIN_UTC) - }) - .map(|expiration| make_test_path(destination, 1, expiration)) - .collect() - } - - fn get_unexpired_paths(destination: IsdAsn, count: usize) -> Vec { - (0..count) - .map(|_| make_test_path(destination, 1, DateTime::::MAX_UTC)) - .collect() - } - - fn get_paths_with_hops_and_expiry( - remote_ia: IsdAsn, - base_expiry_time: DateTime, - hops_and_expiry_offsets: &[(usize, &[Duration])], - ) -> Vec { - hops_and_expiry_offsets - .iter() - .flat_map(|(count, offsets)| { - offsets - .iter() - .map(|offset| make_test_path(remote_ia, *count, base_expiry_time + *offset)) - }) - .collect() - } - - fn make_test_path(destination: IsdAsn, hop_count: usize, expiration: DateTime) -> Path { - static COUNTER: AtomicU64 = AtomicU64::new(0x99_ff00_0000_f999); - - let source = IsdAsn(COUNTER.fetch_add(1, Ordering::Relaxed)); - let metadata = PathMetadata { - expiration, - interfaces: vec![ - Some(PathInterface { - isd_asn: source, - id: 1, - }); - hop_count - ], - ..PathMetadata::default() - }; - Path { - metadata: Some(metadata), - ..Path::empty(ByEndpoint { - source, - destination, - }) - } - } - fn get_multiple_paths_to( strategy: &PathRefresher, destination: IsdAsn, @@ -521,7 +427,9 @@ mod tests { .map(|maybe_path| maybe_path.into_iter().cloned().collect()) } - type GetPathsFn = fn(&PathRefresher, IsdAsn, Instant) -> Result, PathFetchError>; + // -------------------------------------------------------------------------------- + // TESTS + // -------------------------------------------------------------------------------- fn stores_and_returns_paths(n_paths: usize, get_paths: GetPathsFn) -> TestResult { let mut strategy = get_strategy(); @@ -591,7 +499,7 @@ mod tests { strategy.handle_lookup_paths(&paths, Instant::now()); - let returned_paths = get_paths(&strategy, LOCAL_IA, Instant::now()) + let returned_paths = get_paths(&strategy, REMOTE_IA, Instant::now()) .expect("should not err for supported IA"); assert!(returned_paths.is_empty()); @@ -613,7 +521,7 @@ mod tests { strategy.handle_lookup_paths(&paths, expiry_instant); - let returned_paths = get_paths(&strategy, LOCAL_IA, expiry_instant).unwrap(); + let returned_paths = get_paths(&strategy, REMOTE_IA, expiry_instant).unwrap(); assert!(returned_paths.is_empty()); } @@ -637,10 +545,10 @@ mod tests { strategy.handle_lookup_paths(&paths, ten_mins_before_expiry); - let returned_paths = get_paths(&strategy, LOCAL_IA, ten_mins_before_expiry).unwrap(); + let returned_paths = get_paths(&strategy, REMOTE_IA, ten_mins_before_expiry).unwrap(); assert!(!returned_paths.is_empty()); - let returned_paths = get_paths(&strategy, LOCAL_IA, expiry_instant).unwrap(); + let returned_paths = get_paths(&strategy, REMOTE_IA, expiry_instant).unwrap(); assert!(returned_paths.is_empty()); } diff --git a/crates/scion/src/pan/path_strategy/test_utils.rs b/crates/scion/src/pan/path_strategy/test_utils.rs new file mode 100644 index 0000000..e015e42 --- /dev/null +++ b/crates/scion/src/pan/path_strategy/test_utils.rs @@ -0,0 +1,126 @@ +use std::{ + sync::atomic::{AtomicU64, Ordering}, + time::Duration, +}; + +use chrono::{DateTime, Utc}; +use scion_proto::{ + address::IsdAsn, + packet::ByEndpoint, + path::{Path, PathInterface, PathMetadata}, +}; + +/// This checks that two list of *test* paths are equal. +/// +/// As [`Path`] does not implement equality, paths generated for the tests store a unique +/// ID in the source AS of each created path. Paths are then checked for equality based the ID. +macro_rules! assert_paths_unordered_eq { + ($lhs:expr, $rhs:expr) => { + let to_comparable = |p: Path| (p.isd_asn.source, p.expiry_time()); + let mut lhs: Vec<_> = $lhs.into_iter().map(to_comparable).collect(); + let mut rhs: Vec<_> = $rhs.into_iter().map(to_comparable).collect(); + + lhs.sort(); + rhs.sort(); + + assert_eq!(lhs, rhs); + }; +} + +macro_rules! param_test { + ($func_name:ident -> $return_type:ty: [ + $( $case_name:ident: ( $($arg:expr),* ) ),* + ]) => { + mod $func_name { + use super::*; + + $( + #[test] + fn $case_name() -> $return_type { + $func_name($($arg),*) + } + )* + } + }; + ($func_name:ident: [ + $( $case_name:ident: ( $($arg:expr),* ) ),* + ]) => { + param_test!($func_name -> (): [ $($case_name: ($($arg),*)),* ]); + }; + ($func_name:ident$( -> $result:ty)?: [ + $( $case_name:ident: $arg:expr ),* + ]) => { + param_test!($func_name$( -> $result)?: [ $($case_name: ($arg)),* ]); + }; +} + +pub(crate) use assert_paths_unordered_eq; +pub(crate) use param_test; + +/// Returns several paths to the requested destination, all with an expiry time +/// with at most the provided time. +/// +/// The paths have the expiry time of expiry_time, then 1 minute earlier for each additional path. +pub(crate) fn get_paths_with_expiry_time_before( + destination: IsdAsn, + count: usize, + expiry_time: DateTime, +) -> Vec { + (0..count) + .map(|i| { + expiry_time + .checked_sub_signed(chrono::Duration::seconds(60 * i as i64)) + .unwrap_or(DateTime::::MIN_UTC) + }) + .map(|expiration| make_test_path(destination, 1, expiration)) + .collect() +} + +pub(crate) fn get_unexpired_paths(destination: IsdAsn, count: usize) -> Vec { + (0..count) + .map(|_| make_test_path(destination, 1, DateTime::::MAX_UTC)) + .collect() +} + +pub(crate) fn get_paths_with_hops_and_expiry( + remote_ia: IsdAsn, + base_expiry_time: DateTime, + hops_and_expiry_offsets: &[(usize, &[Duration])], +) -> Vec { + hops_and_expiry_offsets + .iter() + .flat_map(|(count, offsets)| { + offsets + .iter() + .map(|offset| make_test_path(remote_ia, *count, base_expiry_time + *offset)) + }) + .collect() +} + +pub(crate) fn make_test_path( + destination: IsdAsn, + hop_count: usize, + expiration: DateTime, +) -> Path { + static COUNTER: AtomicU64 = AtomicU64::new(0x99_ff00_0000_f999); + + let source = IsdAsn(COUNTER.fetch_add(1, Ordering::Relaxed)); + let metadata = PathMetadata { + expiration, + interfaces: vec![ + Some(PathInterface { + isd_asn: source, + id: 1, + }); + hop_count + ], + ..PathMetadata::default() + }; + Path { + metadata: Some(metadata), + ..Path::empty(ByEndpoint { + source, + destination, + }) + } +} diff --git a/crates/scion/src/pan/path_strategy/uniform.rs b/crates/scion/src/pan/path_strategy/uniform.rs new file mode 100644 index 0000000..2c7826d --- /dev/null +++ b/crates/scion/src/pan/path_strategy/uniform.rs @@ -0,0 +1,234 @@ +//! A uniform path strategy that can be used to compose other strategies. + +use std::{ + cell::RefCell, + cmp, + collections::{hash_map::Entry, HashMap}, + time::{Duration, Instant}, +}; + +use scion_proto::{address::IsdAsn, path::Path}; + +use super::{PathFetchError, PathStrategy, Request}; + +/// A [path strategy][PathStrategy] that applies another path strategy on a per-destination basis. +/// +/// Path strategies, such as the [`PathRefresher`][super::refresher::PathRefresher] strategy, are +/// created only for a specific destination SCION AS. The `UniformStrategy` allows extending such +/// strategies to work for multiple destination ASes by creating a new instance of the strategy for +/// each destination. +/// +/// A new instance of the strategy is only created for a destination when a path to that destination is +/// requested, or paths to that destination are cached. +// TODO(jsmith): Do we want to prune unused strategies? +pub struct UniformStrategy { + inner: HashMap, + factory: Box T>, + destinations_to_add: RefCell>, +} + +impl UniformStrategy +where + T: PathStrategy, +{ + /// Creates a new instance of `UniformStrategy` that uses the provided factory function + /// to initialise new instances of a [`PathStrategy`] for each queried destination. + pub fn new(factory: F) -> Self + where + F: FnMut(IsdAsn) -> T + 'static, + { + Self { + inner: HashMap::new(), + factory: Box::new(factory), + destinations_to_add: RefCell::default(), + } + } + + fn get_strategy(&self, destination: IsdAsn) -> Result<&T, PathFetchError> { + if destination.is_wildcard() { + return Err(PathFetchError::UnsupportedDestination); + } + + if let Some(strategy) = self.inner.get(&destination) { + Ok(strategy) + } else { + self.destinations_to_add.borrow_mut().push(destination); + Err(PathFetchError::RequiresPoll) + } + } + + fn get_or_insert(&mut self, remote_ia: IsdAsn) -> &mut T { + assert!(!remote_ia.is_wildcard()); + self.inner + .entry(remote_ia) + .or_insert_with(|| (self.factory)(remote_ia)) + } + + fn fill_missing_destinations(&mut self) { + for remote_ia in self.destinations_to_add.borrow_mut().drain(..) { + if let Entry::Vacant(entry) = self.inner.entry(remote_ia) { + entry.insert((self.factory)(remote_ia)); + } + } + } +} + +impl PathStrategy for UniformStrategy +where + T: PathStrategy, + for<'a> T::PathsTo<'a>: Default, +{ + type PathsTo<'p> = T::PathsTo<'p> + where + Self: 'p; + + fn paths_to( + &self, + destination: IsdAsn, + now: Instant, + ) -> Result, PathFetchError> { + self.get_strategy(destination)?.paths_to(destination, now) + } + + fn path_to(&self, destination: IsdAsn, now: Instant) -> Result, PathFetchError> { + self.get_strategy(destination)?.path_to(destination, now) + } + + fn is_path_available(&self, destination: IsdAsn, now: Instant) -> Result { + self.get_strategy(destination)? + .is_path_available(destination, now) + } + + fn poll_requests(&mut self, now: Instant) -> Request { + self.fill_missing_destinations(); + + let mut earliest_callback = Duration::MAX; + for strategy in self.inner.values_mut() { + match strategy.poll_requests(now) { + lookup @ Request::LookupPathsTo(_) => return lookup, + Request::Callback(callback) => { + earliest_callback = cmp::min(callback, earliest_callback) + } + } + } + + Request::Callback(earliest_callback) + } + + fn handle_lookup_paths(&mut self, paths: &[Path], now: Instant) { + let Some(sample_path) = paths.first() else { + return; + }; + if sample_path.destination().is_wildcard() { + return; + } + + self.fill_missing_destinations(); + self.get_or_insert(sample_path.destination()) + .handle_lookup_paths(paths, now); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::pan::path_strategy::{ + refresher::PathRefresher, + test_utils::{assert_paths_unordered_eq, get_unexpired_paths, param_test}, + }; + + const REMOTE_IA: IsdAsn = IsdAsn(0x1_ff00_0000_0001); + const OTHER_IA: IsdAsn = IsdAsn(0x2_ff00_0000_0002); + + type TestResult = Result>; + type GetPathsFn = + fn(&UniformStrategy, IsdAsn, Instant) -> Result, PathFetchError>; + + fn get_strategy() -> UniformStrategy { + UniformStrategy::new(PathRefresher::new) + } + + fn get_multiple_paths_to( + strategy: &UniformStrategy, + destination: IsdAsn, + now: Instant, + ) -> Result, PathFetchError> { + strategy + .paths_to(destination, now) + .map(|iter| iter.cloned().collect()) + } + + fn get_single_path_to( + strategy: &UniformStrategy, + destination: IsdAsn, + now: Instant, + ) -> Result, PathFetchError> { + strategy + .path_to(destination, now) + .map(|maybe_path| maybe_path.into_iter().cloned().collect()) + } + + fn performs_lookups_to_previously_requested_destinations(get_path: GetPathsFn) { + let mut strategy = get_strategy(); + + assert_eq!( + get_path(&strategy, REMOTE_IA, Instant::now()), + Err(PathFetchError::RequiresPoll) + ); + assert_eq!( + strategy.poll_requests(Instant::now()), + Request::LookupPathsTo(REMOTE_IA) + ); + } + + param_test! { + performs_lookups_to_previously_requested_destinations: [ + single_path: get_single_path_to, + multiple_paths: get_multiple_paths_to + ] + } + + fn stores_and_returns_paths(n_paths: usize, get_paths: GetPathsFn) -> TestResult { + let mut strategy = get_strategy(); + + let remote_paths = get_unexpired_paths(REMOTE_IA, n_paths); + strategy.handle_lookup_paths(&remote_paths, Instant::now()); + + let other_paths = get_unexpired_paths(OTHER_IA, n_paths); + strategy.handle_lookup_paths(&other_paths, Instant::now()); + + let returned_remote_paths = get_paths(&strategy, REMOTE_IA, Instant::now())?; + assert_paths_unordered_eq!(returned_remote_paths, remote_paths); + + let returned_other_paths = get_paths(&strategy, OTHER_IA, Instant::now())?; + assert_paths_unordered_eq!(returned_other_paths, other_paths); + + Ok(()) + } + + param_test! { + stores_and_returns_paths -> TestResult: [ + single_path: (1, get_single_path_to), + multiple_paths: (3, get_multiple_paths_to) + ] + } + + fn get_paths_errs_on_wildcard(remote_ia: &str, get_path: GetPathsFn) { + let strategy = get_strategy(); + let remote_ia: IsdAsn = remote_ia.parse().unwrap(); + let result = get_path(&strategy, remote_ia, Instant::now()); + + assert_eq!(result, Err(PathFetchError::UnsupportedDestination)); + } + + param_test! { + get_paths_errs_on_wildcard -> (): [ + single_path_wildcard_asn: ("1-0", get_single_path_to), + single_path_wildcard_isd: ("0-ff00:0:110", get_single_path_to), + single_path_wildcard_ia: ("0-0", get_single_path_to), + multiple_paths_wildcard_asn: ("1-0", get_multiple_paths_to), + multiple_paths_wildcard_isd: ("0-ff00:0:110", get_multiple_paths_to), + multiple_paths_wildcard_ia: ("0-0", get_multiple_paths_to) + ] + } +}