Skip to content

Commit

Permalink
feat: add path refresher strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
jpcsmith committed Jan 22, 2024
1 parent e719110 commit c125c5e
Show file tree
Hide file tree
Showing 10 changed files with 1,160 additions and 30 deletions.
2 changes: 1 addition & 1 deletion crates/scion-proto/src/packet/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl WireEncode for ScionHeaders {
}

/// Instances of an object associated with both a source and destination endpoint.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)]
pub struct ByEndpoint<T> {
/// The value for the source
pub source: T,
Expand Down
44 changes: 42 additions & 2 deletions crates/scion-proto/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub use fingerprint::{FingerprintError, PathFingerprint};
mod metadata;
pub use metadata::{GeoCoordinates, LinkType, PathInterface, PathMetadata};

/// Minimum MTU along any path or within any AS.
pub const PATH_MIN_MTU: u16 = 1280;

/// A SCION end-to-end path with optional metadata.
#[derive(Debug, Clone)]
pub struct Path<T = Bytes> {
Expand Down Expand Up @@ -65,7 +68,28 @@ impl<T> Path<T> {
/// Panics if the AS is a wildcard AS.
pub fn local(isd_asn: IsdAsn) -> Self {
assert!(!isd_asn.is_wildcard(), "no local path for wildcard AS");
Self::empty(ByEndpoint::with_cloned(isd_asn))

Self {
dataplane_path: DataplanePath::EmptyPath,
underlay_next_hop: None,
isd_asn: ByEndpoint::with_cloned(isd_asn),
metadata: Some(PathMetadata {
expiration: DateTime::<Utc>::MAX_UTC,
mtu: PATH_MIN_MTU,
interfaces: vec![],
..PathMetadata::default()
}),
}
}

/// Returns the source of this path.
pub const fn source(&self) -> IsdAsn {
self.isd_asn.source
}

/// Returns the destination of this path.
pub const fn destination(&self) -> IsdAsn {
self.isd_asn.destination
}

pub fn empty(isd_asn: ByEndpoint<IsdAsn>) -> Self {
Expand Down Expand Up @@ -101,6 +125,20 @@ impl<T> Path<T> {
pub fn expiry_time(&self) -> Option<DateTime<Utc>> {
self.metadata.as_ref().map(|metadata| metadata.expiration)
}

/// Returns true if the path contains an expiry time, and it is after now,
/// false if the contained expiry time is at or before now, and None if the path
/// does not contain an expiry time.
pub fn is_expired(&self, now: DateTime<Utc>) -> Option<bool> {
self.expiry_time().map(|t| t <= now)
}

/// Returns the number of interfaces traversed by the path, if available. Otherwise None.
pub fn interface_count(&self) -> Option<usize> {
self.metadata
.as_ref()
.map(|metadata| metadata.interfaces.len())
}
}

#[allow(missing_docs)]
Expand All @@ -112,8 +150,10 @@ impl Path<Bytes> {
) -> Result<Self, PathParseError> {
let mut dataplane_path = Bytes::from(std::mem::take(&mut value.raw));
if dataplane_path.is_empty() {
return if isd_asn.are_equal() {
return if isd_asn.are_equal() && isd_asn.destination.is_wildcard() {
Ok(Path::empty(isd_asn))
} else if isd_asn.are_equal() {
Ok(Path::local(isd_asn.destination))
} else {
Err(PathParseErrorKind::EmptyRaw.into())
};
Expand Down
42 changes: 34 additions & 8 deletions crates/scion/src/daemon/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A client to communicate with the SCION daemon.

use std::env;
use std::{env, vec};

use scion_grpc::daemon::{v1 as daemon_grpc, v1::daemon_service_client::DaemonServiceClient};
use scion_proto::{address::IsdAsn, packet::ByEndpoint, path::Path};
Expand All @@ -11,6 +11,13 @@ use super::{
messages::{self, PathRequest},
AsInfo,
};
use crate::pan::{AsyncPathService, PathLookupError};

/// The default address of the SCION daemon.
pub const DEFAULT_DAEMON_ADDRESS: &str = "https://localhost:30255";

/// The environment variable to configure the address of the SCION daemon.
pub const DAEMON_ADDRESS_ENV_VARIABLE: &str = "SCION_DAEMON_ADDRESS";

#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
Expand All @@ -23,12 +30,6 @@ pub enum DaemonClientError {
InvalidData,
}

/// The default address of the SCION daemon.
pub const DEFAULT_DAEMON_ADDRESS: &str = "https://localhost:30255";

/// The environment variable to configure the address of the SCION daemon.
pub const DAEMON_ADDRESS_ENV_VARIABLE: &str = "SCION_DAEMON_ADDRESS";

/// Get the daemon address.
///
/// Depending on the environment, this is the [`DEFAULT_DAEMON_ADDRESS`] or manually configured
Expand Down Expand Up @@ -124,7 +125,7 @@ impl DaemonClient {
#[derive(Debug)]
pub struct Paths {
isd_asn: ByEndpoint<IsdAsn>,
grpc_paths: std::vec::IntoIter<daemon_grpc::Path>,
grpc_paths: vec::IntoIter<daemon_grpc::Path>,
}

impl Iterator for Paths {
Expand All @@ -140,3 +141,28 @@ impl Iterator for Paths {
None
}
}

impl AsyncPathService for DaemonClient {
type PathsTo = Paths;

async fn paths_to(&self, scion_as: IsdAsn) -> Result<Self::PathsTo, PathLookupError> {
self.check_destination(scion_as)?;

Ok(self.paths(&PathRequest::new(scion_as)).await?)
}

async fn path_to(&self, scion_as: IsdAsn) -> Result<Path, PathLookupError> {
self.check_destination(scion_as)?;

self.paths(&PathRequest::new(scion_as))
.await?
.next()
.ok_or(PathLookupError::NoPath)
}
}

impl From<DaemonClientError> for PathLookupError {
fn from(value: DaemonClientError) -> Self {
PathLookupError::Other(Box::new(value))
}
}
2 changes: 1 addition & 1 deletion crates/scion/src/pan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod datagram;
pub use datagram::{AsyncScionDatagram, PathAwareDatagram};

mod path_service;
pub use path_service::AsyncPathService;
pub use path_service::{AsyncPathService, PathLookupError};

mod error;
pub use error::{PathErrorKind, ReceiveError, SendError};
Expand Down
3 changes: 3 additions & 0 deletions crates/scion/src/pan/path_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum PathLookupError {
/// The destination can be queried, but there are no paths available to it.
#[error("no path available to destination")]
NoPath,
/// Other errors raised by the service.
#[error(transparent)]
Other(Box<dyn std::error::Error + Send>),
}

/// Trait for asynchronously retrieving paths to SCION ASes.
Expand Down
7 changes: 6 additions & 1 deletion crates/scion/src/pan/path_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ use scion_proto::{address::IsdAsn, path::Path};
mod async_strategy;
pub use async_strategy::AsyncPathStrategy;

pub mod refresher;

mod utc_instant;

/// Errors returned when fetching paths from a [`PathStrategy`].
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum PathFetchError {
/// The requested destination is not supported by this strategy,
/// and will never return a valid result.
Expand All @@ -16,6 +20,7 @@ pub enum PathFetchError {
}

/// Requests that a path strategy can make on its controller.
#[derive(Debug, PartialEq, Eq)]
pub enum Request {
/// Requests that the controller queries paths to the specified destination.
LookupPathsTo(IsdAsn),
Expand Down
37 changes: 20 additions & 17 deletions crates/scion/src/pan/path_strategy/async_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use std::{
collections::HashSet,
future::Future,
sync::{Arc, Mutex},
time::Duration,
};

use futures::stream::{FuturesUnordered, StreamExt};
use scion_proto::{address::IsdAsn, path::Path};
use tokio::{task::JoinHandle, time, time::Instant};
use tokio::{sync::watch, task::JoinHandle, time, time::Instant};

use super::PathStrategy;
use crate::pan::{
Expand Down Expand Up @@ -52,8 +51,6 @@ where
S: PathStrategy + Send + 'static,
P: AsyncPathService + Send + Sync + 'static,
{
const PATH_WAIT_TIME: Duration = Duration::from_millis(10);

/// Creates a new `AsyncPathStrategy` and spawns a background task to drive the strategy.
///
/// The provided [`PathStrategy`] determines the logic and the provided [`AsyncPathService`]
Expand All @@ -78,7 +75,7 @@ where
F: FnOnce(&S, Instant) -> Result<T, PathLookupError>,
{
let start = Instant::now();
tracing::debug!("waiting until paths become available");
let mut update_listener = self.inner.subscribe_to_path_changes();

loop {
{
Expand All @@ -89,23 +86,21 @@ where
match strategy.is_path_available(scion_as, now.into()) {
Err(PathFetchError::UnsupportedDestination) => {
tracing::debug!("request was for unsupported destination");
return Err(PathLookupError::NoPath);
return Err(PathLookupError::UnsupportedDestination);
}
Ok(true) => {
tracing::debug!(now=?rel_now, "paths are available, running handler");
return handler(&*strategy, now);
}
Ok(false) => tracing::debug!(
now = tracing::field::debug(rel_now),
"no paths currently available"
),
Ok(false) => tracing::debug!(now = ?rel_now, "no paths currently available"),
}
}

// Try again after a brief pause.
let rel_now = Instant::now().duration_since(start);
tracing::debug!(now=?rel_now, "waiting {:?} until next check", Self::PATH_WAIT_TIME);
tokio::time::sleep(Self::PATH_WAIT_TIME).await;
tracing::debug!("waiting until the path strategy has received paths");
if update_listener.changed().await.is_err() {
tracing::warn!("channel dropped while waiting, aborting wait");
return Err(PathLookupError::NoPath);
}
}
}
}
Expand Down Expand Up @@ -158,6 +153,7 @@ impl<S, P> Drop for AsyncPathStrategy<S, P> {
struct AsyncPathStrategyInner<S, P> {
path_service: P,
strategy: Mutex<S>,
update_notifier: watch::Sender<()>,
}

impl<'p, S, P> AsyncPathStrategyInner<S, P>
Expand All @@ -169,6 +165,7 @@ where
Self {
strategy: Mutex::new(strategy),
path_service,
update_notifier: watch::Sender::new(()),
}
}

Expand Down Expand Up @@ -212,9 +209,8 @@ where
callback_time=?callback_time.duration_since(start), "waiting for paths or callback timeout"
);

// Wait for the callback duration, or until a pending path query completes.
// If there are no pending path queries, then this will just sleep until the
// callback time.
// Wait for the callback duration, or until a pending path query completes. If there
// are no pending path queries, then this will just sleep until the callback time.
while let Ok(next) = time::timeout_at(callback_time, requests.next_ready()).await {
let span = tracing::debug_span!("event_wait", now=?start.elapsed());

Expand All @@ -234,6 +230,9 @@ where

tracing::debug!(%scion_as, count=found_paths.len(), "path lookup successful");
strategy.handle_lookup_paths(&found_paths, Instant::now().into());

self.update_notifier.send_replace(());

break;
}
None => {
Expand All @@ -245,6 +244,10 @@ where
}
}
}

fn subscribe_to_path_changes(&self) -> watch::Receiver<()> {
self.update_notifier.subscribe()
}
}

/// Tracks the ASes for which path requests are pending.
Expand Down
Loading

0 comments on commit c125c5e

Please sign in to comment.