Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add path refresher strategy #120

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/scion-proto/src/packet/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl WireEncode for ScionHeaders {
}

/// Instances of an object associated with both a source and destination endpoint.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)]
pub struct ByEndpoint<T> {
/// The value for the source
pub source: T,
Expand Down
44 changes: 42 additions & 2 deletions crates/scion-proto/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub use fingerprint::{FingerprintError, PathFingerprint};
mod metadata;
pub use metadata::{GeoCoordinates, LinkType, PathInterface, PathMetadata};

/// Minimum MTU along any path or within any AS.
pub const PATH_MIN_MTU: u16 = 1280;

/// A SCION end-to-end path with optional metadata.
#[derive(Debug, Clone)]
pub struct Path<T = Bytes> {
Expand Down Expand Up @@ -65,7 +68,28 @@ impl<T> Path<T> {
/// Panics if the AS is a wildcard AS.
pub fn local(isd_asn: IsdAsn) -> Self {
assert!(!isd_asn.is_wildcard(), "no local path for wildcard AS");
Self::empty(ByEndpoint::with_cloned(isd_asn))

Self {
dataplane_path: DataplanePath::EmptyPath,
underlay_next_hop: None,
isd_asn: ByEndpoint::with_cloned(isd_asn),
metadata: Some(PathMetadata {
expiration: DateTime::<Utc>::MAX_UTC,
mtu: PATH_MIN_MTU,
interfaces: vec![],
..PathMetadata::default()
}),
}
}

/// Returns the source of this path.
pub const fn source(&self) -> IsdAsn {
self.isd_asn.source
}

/// Returns the destination of this path.
pub const fn destination(&self) -> IsdAsn {
self.isd_asn.destination
}

pub fn empty(isd_asn: ByEndpoint<IsdAsn>) -> Self {
Expand Down Expand Up @@ -101,6 +125,20 @@ impl<T> Path<T> {
pub fn expiry_time(&self) -> Option<DateTime<Utc>> {
self.metadata.as_ref().map(|metadata| metadata.expiration)
}

/// Returns true if the path contains an expiry time, and it is after now,
/// false if the contained expiry time is at or before now, and None if the path
/// does not contain an expiry time.
pub fn is_expired(&self, now: DateTime<Utc>) -> Option<bool> {
self.expiry_time().map(|t| t <= now)
}

/// Returns the number of interfaces traversed by the path, if available. Otherwise None.
pub fn interface_count(&self) -> Option<usize> {
self.metadata
.as_ref()
.map(|metadata| metadata.interfaces.len())
}
}

#[allow(missing_docs)]
Expand All @@ -112,8 +150,10 @@ impl Path<Bytes> {
) -> Result<Self, PathParseError> {
let mut dataplane_path = Bytes::from(std::mem::take(&mut value.raw));
if dataplane_path.is_empty() {
return if isd_asn.are_equal() {
return if isd_asn.are_equal() && isd_asn.destination.is_wildcard() {
Ok(Path::empty(isd_asn))
} else if isd_asn.are_equal() {
Ok(Path::local(isd_asn.destination))
} else {
Err(PathParseErrorKind::EmptyRaw.into())
};
Expand Down
42 changes: 34 additions & 8 deletions crates/scion/src/daemon/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A client to communicate with the SCION daemon.

use std::env;
use std::{env, vec};

use scion_grpc::daemon::{v1 as daemon_grpc, v1::daemon_service_client::DaemonServiceClient};
use scion_proto::{address::IsdAsn, packet::ByEndpoint, path::Path};
Expand All @@ -11,6 +11,13 @@ use super::{
messages::{self, PathRequest},
AsInfo,
};
use crate::pan::{AsyncPathService, PathLookupError};

/// The default address of the SCION daemon.
pub const DEFAULT_DAEMON_ADDRESS: &str = "https://localhost:30255";

/// The environment variable to configure the address of the SCION daemon.
pub const DAEMON_ADDRESS_ENV_VARIABLE: &str = "SCION_DAEMON_ADDRESS";

#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
Expand All @@ -23,12 +30,6 @@ pub enum DaemonClientError {
InvalidData,
}

/// The default address of the SCION daemon.
pub const DEFAULT_DAEMON_ADDRESS: &str = "https://localhost:30255";

/// The environment variable to configure the address of the SCION daemon.
pub const DAEMON_ADDRESS_ENV_VARIABLE: &str = "SCION_DAEMON_ADDRESS";

/// Get the daemon address.
///
/// Depending on the environment, this is the [`DEFAULT_DAEMON_ADDRESS`] or manually configured
Expand Down Expand Up @@ -124,7 +125,7 @@ impl DaemonClient {
#[derive(Debug)]
pub struct Paths {
isd_asn: ByEndpoint<IsdAsn>,
grpc_paths: std::vec::IntoIter<daemon_grpc::Path>,
grpc_paths: vec::IntoIter<daemon_grpc::Path>,
}

impl Iterator for Paths {
Expand All @@ -140,3 +141,28 @@ impl Iterator for Paths {
None
}
}

impl AsyncPathService for DaemonClient {
type PathsTo = Paths;

async fn paths_to(&self, scion_as: IsdAsn) -> Result<Self::PathsTo, PathLookupError> {
self.check_destination(scion_as)?;

Ok(self.paths(&PathRequest::new(scion_as)).await?)
}

async fn path_to(&self, scion_as: IsdAsn) -> Result<Path, PathLookupError> {
self.check_destination(scion_as)?;

self.paths(&PathRequest::new(scion_as))
.await?
.next()
.ok_or(PathLookupError::NoPath)
}
}

impl From<DaemonClientError> for PathLookupError {
fn from(value: DaemonClientError) -> Self {
PathLookupError::Other(Box::new(value))
}
}
2 changes: 1 addition & 1 deletion crates/scion/src/pan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod datagram;
pub use datagram::{AsyncScionDatagram, PathAwareDatagram};

mod path_service;
pub use path_service::AsyncPathService;
pub use path_service::{AsyncPathService, PathLookupError};

mod error;
pub use error::{PathErrorKind, ReceiveError, SendError};
Expand Down
3 changes: 3 additions & 0 deletions crates/scion/src/pan/path_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum PathLookupError {
/// The destination can be queried, but there are no paths available to it.
#[error("no path available to destination")]
NoPath,
/// Other errors raised by the service.
#[error(transparent)]
Other(Box<dyn std::error::Error + Send>),
}

/// Trait for asynchronously retrieving paths to SCION ASes.
Expand Down
7 changes: 6 additions & 1 deletion crates/scion/src/pan/path_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ use scion_proto::{address::IsdAsn, path::Path};
mod async_strategy;
pub use async_strategy::AsyncPathStrategy;

pub mod refresher;

mod utc_instant;

/// Errors returned when fetching paths from a [`PathStrategy`].
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum PathFetchError {
/// The requested destination is not supported by this strategy,
/// and will never return a valid result.
Expand All @@ -16,6 +20,7 @@ pub enum PathFetchError {
}

/// Requests that a path strategy can make on its controller.
#[derive(Debug, PartialEq, Eq)]
pub enum Request {
/// Requests that the controller queries paths to the specified destination.
LookupPathsTo(IsdAsn),
Expand Down
37 changes: 20 additions & 17 deletions crates/scion/src/pan/path_strategy/async_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use std::{
collections::HashSet,
future::Future,
sync::{Arc, Mutex},
time::Duration,
};

use futures::stream::{FuturesUnordered, StreamExt};
use scion_proto::{address::IsdAsn, path::Path};
use tokio::{task::JoinHandle, time, time::Instant};
use tokio::{sync::watch, task::JoinHandle, time, time::Instant};

use super::PathStrategy;
use crate::pan::{
Expand Down Expand Up @@ -52,8 +51,6 @@ where
S: PathStrategy + Send + 'static,
P: AsyncPathService + Send + Sync + 'static,
{
const PATH_WAIT_TIME: Duration = Duration::from_millis(10);

/// Creates a new `AsyncPathStrategy` and spawns a background task to drive the strategy.
///
/// The provided [`PathStrategy`] determines the logic and the provided [`AsyncPathService`]
Expand All @@ -78,7 +75,7 @@ where
F: FnOnce(&S, Instant) -> Result<T, PathLookupError>,
{
let start = Instant::now();
tracing::debug!("waiting until paths become available");
let mut update_listener = self.inner.subscribe_to_path_changes();

loop {
{
Expand All @@ -89,23 +86,21 @@ where
match strategy.is_path_available(scion_as, now.into()) {
Err(PathFetchError::UnsupportedDestination) => {
tracing::debug!("request was for unsupported destination");
return Err(PathLookupError::NoPath);
return Err(PathLookupError::UnsupportedDestination);
}
Ok(true) => {
tracing::debug!(now=?rel_now, "paths are available, running handler");
return handler(&*strategy, now);
}
Ok(false) => tracing::debug!(
now = tracing::field::debug(rel_now),
"no paths currently available"
),
Ok(false) => tracing::debug!(now = ?rel_now, "no paths currently available"),
}
}

// Try again after a brief pause.
let rel_now = Instant::now().duration_since(start);
tracing::debug!(now=?rel_now, "waiting {:?} until next check", Self::PATH_WAIT_TIME);
tokio::time::sleep(Self::PATH_WAIT_TIME).await;
tracing::debug!("waiting until the path strategy has received paths");
if update_listener.changed().await.is_err() {
tracing::warn!("channel dropped while waiting, aborting wait");
return Err(PathLookupError::NoPath);
}
jpcsmith marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -158,6 +153,7 @@ impl<S, P> Drop for AsyncPathStrategy<S, P> {
struct AsyncPathStrategyInner<S, P> {
path_service: P,
strategy: Mutex<S>,
update_notifier: watch::Sender<()>,
}

impl<'p, S, P> AsyncPathStrategyInner<S, P>
Expand All @@ -169,6 +165,7 @@ where
Self {
strategy: Mutex::new(strategy),
path_service,
update_notifier: watch::Sender::new(()),
}
}

Expand Down Expand Up @@ -212,9 +209,8 @@ where
callback_time=?callback_time.duration_since(start), "waiting for paths or callback timeout"
);

// Wait for the callback duration, or until a pending path query completes.
// If there are no pending path queries, then this will just sleep until the
// callback time.
// Wait for the callback duration, or until a pending path query completes. If there
// are no pending path queries, then this will just sleep until the callback time.
while let Ok(next) = time::timeout_at(callback_time, requests.next_ready()).await {
let span = tracing::debug_span!("event_wait", now=?start.elapsed());

Expand All @@ -234,6 +230,9 @@ where

tracing::debug!(%scion_as, count=found_paths.len(), "path lookup successful");
strategy.handle_lookup_paths(&found_paths, Instant::now().into());

self.update_notifier.send_replace(());

break;
}
None => {
Expand All @@ -245,6 +244,10 @@ where
}
}
}

fn subscribe_to_path_changes(&self) -> watch::Receiver<()> {
self.update_notifier.subscribe()
}
}

/// Tracks the ASes for which path requests are pending.
Expand Down
Loading