Skip to content

Commit

Permalink
feat: adds the uniform path strategy that extends other strategies to…
Browse files Browse the repository at this point in the history
… multiple ASes
  • Loading branch information
jpcsmith committed Jan 24, 2024
1 parent 68b3197 commit 4528651
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 165 deletions.
13 changes: 11 additions & 2 deletions crates/scion/src/pan/path_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ use scion_proto::{address::IsdAsn, path::Path};
mod async_strategy;
pub use async_strategy::AsyncPathStrategy;

pub mod refresher;

mod utc_instant;

#[cfg(test)]
mod test_utils;

pub mod refresher;
pub mod uniform;

/// Errors returned when fetching paths from a [`PathStrategy`].
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum PathFetchError {
/// The requested destination is not supported by this strategy,
/// and will never return a valid result.
#[error("the requested destination is not supported by this strategy")]
UnsupportedDestination,
/// This error is raised when a call to a path fetch changes the internal state
/// of the strategy, in a way that would require the strategy to be polled despite
/// any previous pending callbacks.
#[error("the strategy wants to be polled irrespective of its previous callback")]
RequiresPoll,
}

/// Requests that a path strategy can make on its controller.
Expand Down
138 changes: 100 additions & 38 deletions crates/scion/src/pan/path_strategy/async_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ use std::{

use futures::stream::{FuturesUnordered, StreamExt};
use scion_proto::{address::IsdAsn, path::Path};
use tokio::{sync::watch, task::JoinHandle, time, time::Instant};
use tokio::{
sync::{watch, Notify},
task::JoinHandle,
time,
time::Instant,
};

use super::PathStrategy;
use crate::pan::{
Expand Down Expand Up @@ -88,6 +93,9 @@ where
tracing::debug!("request was for unsupported destination");
return Err(PathLookupError::UnsupportedDestination);
}
Err(PathFetchError::RequiresPoll) => {
self.inner.wake();
}
Ok(true) => {
tracing::debug!(now=?rel_now, "paths are available, running handler");
return handler(&*strategy, now);
Expand Down Expand Up @@ -154,6 +162,7 @@ struct AsyncPathStrategyInner<S, P> {
path_service: P,
strategy: Mutex<S>,
update_notifier: watch::Sender<()>,
waker: Notify,
}

impl<'p, S, P> AsyncPathStrategyInner<S, P>
Expand All @@ -166,6 +175,7 @@ where
strategy: Mutex::new(strategy),
path_service,
update_notifier: watch::Sender::new(()),
waker: Notify::new(),
}
}

Expand Down Expand Up @@ -211,35 +221,47 @@ where

// Wait for the callback duration, or until a pending path query completes. If there
// are no pending path queries, then this will just sleep until the callback time.
while let Ok(next) = time::timeout_at(callback_time, requests.next_ready()).await {
let span = tracing::debug_span!("event_wait", now=?start.elapsed());

match next {
Some((scion_as, Err(err))) => {
span.in_scope(
|| tracing::warn!(%scion_as, %err, "ignoring path lookup failure"),
);
continue;
}
Some((scion_as, Ok(paths))) => {
let _guard = span.enter();
let mut strategy = self.strategy.lock().unwrap();

found_paths.clear();
found_paths.extend(paths);

tracing::debug!(%scion_as, count=found_paths.len(), "path lookup successful");
strategy.handle_lookup_paths(&found_paths, Instant::now().into());

self.update_notifier.send_replace(());

loop {
tokio::select! {
_ = time::sleep_until(callback_time) => {
tracing::debug!(now=?start.elapsed(), "callback duration elapsed");
break;
}
None => {
span.in_scope(|| tracing::debug!("no pending path lookups remaining"));
time::sleep_until(callback_time).await;
_ = self.waker.notified() => {
tracing::debug!(now=?start.elapsed(), "woken prematurely by call to wake()");
break;
}
next = requests.next_ready() => {
let span = tracing::debug_span!("event_wait", now=?start.elapsed());

match next {
Some((scion_as, Err(err))) => {
span.in_scope(
|| tracing::warn!(%scion_as, %err, "ignoring path lookup failure"),
);
continue;
}
Some((scion_as, Ok(paths))) => {
let _guard = span.enter();
let mut strategy = self.strategy.lock().unwrap();

found_paths.clear();
found_paths.extend(paths);

tracing::debug!(%scion_as, count=found_paths.len(), "path lookup successful");
strategy.handle_lookup_paths(&found_paths, Instant::now().into());

self.update_notifier.send_replace(());

break;
}
None => {
span.in_scope(|| tracing::debug!("no pending path lookups remaining"));
time::sleep_until(callback_time).await;
break;
}
}
}
}
}
}
Expand All @@ -248,6 +270,10 @@ where
fn subscribe_to_path_changes(&self) -> watch::Receiver<()> {
self.update_notifier.subscribe()
}

fn wake(&self) {
self.waker.notify_one()
}
}

/// Tracks the ASes for which path requests are pending.
Expand Down Expand Up @@ -577,13 +603,26 @@ mod tests {

use mocks::*;

async fn run_path_strategy<S, P>(strategy: S, path_service: P, run_duration: Duration)
where
async fn create_and_run_path_strategy<S, P>(
strategy: S,
path_service: P,
run_duration: Duration,
) where
S: PathStrategy + Send + 'static,
P: AsyncPathService + Send + Sync + 'static,
{
let mut async_strategy = AsyncPathStrategy::new(strategy, path_service);
let async_strategy = AsyncPathStrategy::new(strategy, path_service);

run_path_strategy(async_strategy, run_duration).await
}

async fn run_path_strategy<S, P>(
mut async_strategy: AsyncPathStrategy<S, P>,
run_duration: Duration,
) where
S: PathStrategy + Send + 'static,
P: AsyncPathService + Send + Sync + 'static,
{
// Get the background task so that we can join on it
let mut background_task = tokio::spawn(async {});
std::mem::swap(&mut background_task, &mut async_strategy.background_task);
Expand Down Expand Up @@ -624,7 +663,7 @@ mod tests {
.returning(never_completes)
.times(1..);

run_path_strategy(strategy, path_service, millisecs(10)).await;
create_and_run_path_strategy(strategy, path_service, millisecs(10)).await;
}

#[tokio::test(start_paused = true)]
Expand All @@ -638,7 +677,7 @@ mod tests {

path_service.expect_paths_to().never();

run_path_strategy(strategy, path_service, millisecs(10)).await;
create_and_run_path_strategy(strategy, path_service, millisecs(10)).await;
}

async fn polls_until_callback(n_lookups: usize) {
Expand All @@ -650,7 +689,7 @@ mod tests {
.returning(identical_lookups_then_callback(n_lookups, millisecs(100)))
.times(n_lookups + 1);

run_path_strategy(strategy, path_service, millisecs(10)).await;
create_and_run_path_strategy(strategy, path_service, millisecs(10)).await;
}

async_param_test! {
Expand All @@ -676,7 +715,7 @@ mod tests {
.returning(never_completes)
.times(1);

run_path_strategy(strategy, path_service, millisecs(10)).await;
create_and_run_path_strategy(strategy, path_service, millisecs(10)).await;
}

#[tokio::test(start_paused = true)]
Expand All @@ -694,7 +733,7 @@ mod tests {
.returning(never_completes)
.times(2);

run_path_strategy(strategy, path_service, millisecs(10)).await;
create_and_run_path_strategy(strategy, path_service, millisecs(10)).await;
}

#[tokio::test(start_paused = true)]
Expand All @@ -716,7 +755,7 @@ mod tests {
.return_const(())
.times(expected_successful_lookups as usize);

run_path_strategy(strategy, path_service, millisecs(run_duration_ms)).await;
create_and_run_path_strategy(strategy, path_service, millisecs(run_duration_ms)).await;
}

#[tokio::test(start_paused = true)]
Expand All @@ -738,7 +777,7 @@ mod tests {
.withf(any_path_to(IsdAsn(REMOTE_IA.as_u64() + 1)))
.times(1);

run_path_strategy(strategy, path_service, millisecs(10)).await;
create_and_run_path_strategy(strategy, path_service, millisecs(10)).await;
}

#[tokio::test(start_paused = true)]
Expand All @@ -755,7 +794,7 @@ mod tests {

strategy.expect_handle_lookup_paths().return_const(());

run_path_strategy(strategy, path_service, millisecs(10)).await;
create_and_run_path_strategy(strategy, path_service, millisecs(10)).await;
}

#[tokio::test(start_paused = true)]
Expand All @@ -778,7 +817,7 @@ mod tests {
})
.times(1);

run_path_strategy(strategy, path_service, millisecs(35)).await;
create_and_run_path_strategy(strategy, path_service, millisecs(35)).await;
}

#[tokio::test]
Expand Down Expand Up @@ -851,4 +890,27 @@ mod tests {

Ok(())
}

#[tokio::test(start_paused = true)]
async fn polls_when_requested_by_path_lookup() {
tracing_subscriber::fmt::init();

let mut strategy = MockStrategy::new();
let path_service = MockPathService::that_never_completes();

strategy
.expect_is_path_available()
.returning(|_, _| Err(PathFetchError::RequiresPoll));
strategy
.expect_poll_requests()
.returning(repeated_callbacks_with_duration(Duration::from_secs(3600)))
.times(2);

let async_strategy = AsyncPathStrategy::new(strategy, path_service);

let _ = tokio::time::timeout(millisecs(1), async_strategy.path_to(REMOTE_IA)).await;
tokio::time::advance(millisecs(2)).await;

run_path_strategy(async_strategy, millisecs(10)).await;
}
}
Loading

0 comments on commit 4528651

Please sign in to comment.