From d86b042895c038c19ed78f4fb1ec790753bceae7 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Smith Date: Wed, 20 Dec 2023 11:54:17 +0100 Subject: [PATCH] feat: add datagram trait and path-aware datagram (#106) --- .../scion-proto/src/address/scion_address.rs | 16 + .../scion-proto/src/address/socket_address.rs | 16 + crates/scion-proto/src/datagram.rs | 14 +- crates/scion-proto/src/packet/udp.rs | 14 +- crates/scion-proto/src/path.rs | 27 +- crates/scion-proto/src/path/dataplane.rs | 39 +- .../scion-proto/src/reliable/registration.rs | 4 +- crates/scion/Cargo.toml | 1 + crates/scion/src/lib.rs | 1 + crates/scion/src/pan.rs | 9 + crates/scion/src/pan/datagram.rs | 199 +++++++++ crates/scion/src/pan/error.rs | 85 ++++ crates/scion/src/pan/path_service.rs | 37 ++ crates/scion/src/udp_socket.rs | 406 +++++++----------- crates/scion/tests/test_pan_socket.rs | 119 +++++ crates/scion/tests/test_udp_socket.rs | 27 +- 16 files changed, 728 insertions(+), 286 deletions(-) create mode 100644 crates/scion/src/pan.rs create mode 100644 crates/scion/src/pan/datagram.rs create mode 100644 crates/scion/src/pan/error.rs create mode 100644 crates/scion/src/pan/path_service.rs create mode 100644 crates/scion/tests/test_pan_socket.rs diff --git a/crates/scion-proto/src/address/scion_address.rs b/crates/scion-proto/src/address/scion_address.rs index 7c1b1e4..7d25ac0 100644 --- a/crates/scion-proto/src/address/scion_address.rs +++ b/crates/scion-proto/src/address/scion_address.rs @@ -62,6 +62,16 @@ impl ScionAddr { } } +impl AsRef for ScionAddr { + fn as_ref(&self) -> &IsdAsn { + match self { + ScionAddr::V4(addr) => addr.as_ref(), + ScionAddr::V6(addr) => addr.as_ref(), + ScionAddr::Svc(addr) => addr.as_ref(), + } + } +} + macro_rules! impl_from { ($base:ty, $variant:expr) => { impl From<$base> for ScionAddr { @@ -155,6 +165,12 @@ macro_rules! scion_address { } } + impl AsRef for $name { + fn as_ref(&self) -> &IsdAsn { + &self.isd_asn + } + } + impl core::fmt::Display for $name { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{},{}", self.isd_asn(), self.host()) diff --git a/crates/scion-proto/src/address/socket_address.rs b/crates/scion-proto/src/address/socket_address.rs index 4392354..e19c0da 100644 --- a/crates/scion-proto/src/address/socket_address.rs +++ b/crates/scion-proto/src/address/socket_address.rs @@ -113,6 +113,16 @@ impl SocketAddr { } } +impl AsRef for SocketAddr { + fn as_ref(&self) -> &IsdAsn { + match self { + SocketAddr::V4(addr) => addr.as_ref(), + SocketAddr::V6(addr) => addr.as_ref(), + SocketAddr::Svc(addr) => addr.as_ref(), + } + } +} + impl From for SocketAddr { /// Converts a [`SocketAddrV4`] into a [`SocketAddr::V4`]. #[inline] @@ -244,6 +254,12 @@ macro_rules! socket_address { Ok(Self {scion_addr, port }) } } + + impl AsRef for $name { + fn as_ref(&self) -> &IsdAsn { + self.scion_addr.as_ref() + } + } }; } diff --git a/crates/scion-proto/src/datagram.rs b/crates/scion-proto/src/datagram.rs index 43ade0b..b0becd6 100644 --- a/crates/scion-proto/src/datagram.rs +++ b/crates/scion-proto/src/datagram.rs @@ -30,7 +30,7 @@ pub enum UdpEncodeError { /// /// [RFC]: https://www.ietf.org/archive/id/draft-dekater-scion-dataplane-00.html #[derive(Debug, Default, PartialEq)] -pub struct UdpDatagram { +pub struct UdpMessage { /// The source and destination ports pub port: ByEndpoint, /// The length of the header and payload @@ -41,7 +41,7 @@ pub struct UdpDatagram { pub payload: Bytes, } -impl UdpDatagram { +impl UdpMessage { /// SCION protocol number for UDP. /// /// See the [IETF SCION-dataplane RFC draft][rfc] for possible values. @@ -89,7 +89,7 @@ impl UdpDatagram { } } -impl WireEncodeVec<2> for UdpDatagram { +impl WireEncodeVec<2> for UdpMessage { type Error = InadequateBufferSize; fn encode_with_unchecked(&self, buffer: &mut BytesMut) -> [Bytes; 2] { @@ -111,11 +111,11 @@ impl WireEncodeVec<2> for UdpDatagram { } } -impl WireDecode for UdpDatagram { +impl WireDecode for UdpMessage { type Error = UdpDecodeError; fn decode(data: &mut T) -> Result { - if data.remaining() < UdpDatagram::HEADER_LEN { + if data.remaining() < UdpMessage::HEADER_LEN { return Err(Self::Error::DatagramEmptyOrTruncated); } @@ -161,7 +161,7 @@ mod tests { destination: MaybeEncoded::Decoded(Ipv4Addr::from_str("10.0.0.2")?.into()), }, }; - let mut datagram = UdpDatagram::new( + let mut datagram = UdpMessage::new( ByEndpoint { source: 10001, destination: 10002, @@ -193,7 +193,7 @@ mod tests { let mut encoded_bytes = BytesMut::new(); encoded_bytes.put(encoded_datagram[0].clone()); encoded_bytes.put(encoded_datagram[1].clone()); - assert_eq!(UdpDatagram::decode(&mut encoded_bytes.freeze())?, datagram); + assert_eq!(UdpMessage::decode(&mut encoded_bytes.freeze())?, datagram); Ok(()) } diff --git a/crates/scion-proto/src/packet/udp.rs b/crates/scion-proto/src/packet/udp.rs index 558262f..7f839d9 100644 --- a/crates/scion-proto/src/packet/udp.rs +++ b/crates/scion-proto/src/packet/udp.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use super::{InadequateBufferSize, ScionHeaders, ScionPacketRaw}; use crate::{ address::SocketAddr, - datagram::{UdpDatagram, UdpDecodeError}, + datagram::{UdpDecodeError, UdpMessage}, packet::{ByEndpoint, EncodeError}, path::Path, wire_encoding::{WireDecode, WireEncodeVec}, @@ -16,7 +16,7 @@ pub struct ScionPacketUdp { /// Packet headers pub headers: ScionHeaders, /// The contained UDP datagram - pub datagram: UdpDatagram, + pub datagram: UdpMessage, } impl ScionPacketUdp { @@ -62,11 +62,11 @@ impl ScionPacketUdp { let headers = ScionHeaders::new( endhosts, path, - UdpDatagram::PROTOCOL_NUMBER, - payload.len() + UdpDatagram::HEADER_LEN, + UdpMessage::PROTOCOL_NUMBER, + payload.len() + UdpMessage::HEADER_LEN, )?; let mut datagram = - UdpDatagram::new(endhosts.map(|e| e.port()), payload).map_err(|_| todo!())?; + UdpMessage::new(endhosts.map(|e| e.port()), payload).map_err(|_| todo!())?; datagram.set_checksum(&headers.address); Ok(Self { headers, datagram }) @@ -77,14 +77,14 @@ impl TryFrom for ScionPacketUdp { type Error = UdpDecodeError; fn try_from(mut value: ScionPacketRaw) -> Result { - if value.headers.common.next_header != UdpDatagram::PROTOCOL_NUMBER { + if value.headers.common.next_header != UdpMessage::PROTOCOL_NUMBER { return Err(UdpDecodeError::WrongProtocolNumber( value.headers.common.next_header, )); } Ok(Self { headers: value.headers, - datagram: UdpDatagram::decode(&mut value.payload)?, + datagram: UdpMessage::decode(&mut value.payload)?, }) } } diff --git a/crates/scion-proto/src/path.rs b/crates/scion-proto/src/path.rs index b8e323d..6de92c8 100644 --- a/crates/scion-proto/src/path.rs +++ b/crates/scion-proto/src/path.rs @@ -3,7 +3,7 @@ //! This module contains types for SCION paths and metadata as well as encoding and decoding //! functions. -use std::net::SocketAddr; +use std::{net::SocketAddr, ops::Deref}; use bytes::Bytes; use scion_grpc::daemon::v1 as daemon_grpc; @@ -27,7 +27,7 @@ pub mod epic; pub use epic::EpicAuths; /// A SCION end-to-end path with optional metadata. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub struct Path { /// The raw bytes to be added as the path header to SCION dataplane packets. pub dataplane_path: DataplanePath, @@ -125,6 +125,29 @@ impl Path { } } +impl From> for Path { + fn from(value: Path<&mut [u8]>) -> Self { + Self { + dataplane_path: value.dataplane_path.into(), + underlay_next_hop: value.underlay_next_hop, + isd_asn: value.isd_asn, + metadata: value.metadata, + } + } +} + +impl PartialEq for Path +where + T: Deref, +{ + fn eq(&self, other: &Self) -> bool { + self.dataplane_path == other.dataplane_path + && self.underlay_next_hop == other.underlay_next_hop + && self.isd_asn == other.isd_asn + && self.metadata == other.metadata + } +} + #[cfg(test)] mod tests { use std::net::{IpAddr, Ipv4Addr}; diff --git a/crates/scion-proto/src/path/dataplane.rs b/crates/scion-proto/src/path/dataplane.rs index da50571..f6d6d6b 100644 --- a/crates/scion-proto/src/path/dataplane.rs +++ b/crates/scion-proto/src/path/dataplane.rs @@ -61,7 +61,7 @@ impl From for PathType { pub struct UnsupportedPathType(pub u8); /// Dataplane path found in a SCION packet. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum DataplanePath { /// The empty path type, used for intra-AS hops. EmptyPath, @@ -131,6 +131,17 @@ where } } + /// Reverse the path to the provided slice. + /// + /// Unsupported path types are copied to the slice, as is. + pub fn reverse_to_slice<'b>(&self, buffer: &'b mut [u8]) -> DataplanePath<&'b mut [u8]> { + match self { + DataplanePath::EmptyPath => DataplanePath::EmptyPath, + DataplanePath::Standard(path) => DataplanePath::Standard(path.reverse_to_slice(buffer)), + DataplanePath::Unsupported { .. } => self.copy_to_slice(buffer), + } + } + /// Reverses the path. pub fn to_reversed(&self) -> Result { match self { @@ -188,6 +199,30 @@ impl From for DataplanePath { } } +impl PartialEq> for DataplanePath +where + T: Deref, + U: Deref, +{ + fn eq(&self, other: &DataplanePath) -> bool { + match (self, other) { + (Self::Standard(lhs), DataplanePath::Standard(rhs)) => lhs.raw() == rhs.raw(), + ( + Self::Unsupported { + path_type: l_path_type, + bytes: l_bytes, + }, + DataplanePath::Unsupported { + path_type: r_path_type, + bytes: r_bytes, + }, + ) => l_path_type == r_path_type && l_bytes.deref() == r_bytes.deref(), + (Self::EmptyPath, DataplanePath::EmptyPath) => true, + _ => false, + } + } +} + impl WireEncode for DataplanePath { type Error = InadequateBufferSize; @@ -275,7 +310,7 @@ mod tests { #[test] fn reverse_empty() { - let dataplane_path = DataplanePath::EmptyPath; + let dataplane_path = DataplanePath::::EmptyPath; let reverse_path = dataplane_path.to_reversed().unwrap(); assert_eq!(dataplane_path, reverse_path); assert_eq!(reverse_path.to_reversed().unwrap(), dataplane_path); diff --git a/crates/scion-proto/src/reliable/registration.rs b/crates/scion-proto/src/reliable/registration.rs index e153a8b..de4b54c 100644 --- a/crates/scion-proto/src/reliable/registration.rs +++ b/crates/scion-proto/src/reliable/registration.rs @@ -29,7 +29,7 @@ use bytes::{Buf, BufMut}; use super::wire_utils::LAYER4_PORT_OCTETS; use crate::{ address::{HostType, IsdAsn, ServiceAddr, SocketAddr as ScionSocketAddr}, - datagram::UdpDatagram, + datagram::UdpMessage, reliable::{ wire_utils::{encoded_address_and_port_length, encoded_address_length}, ADDRESS_TYPE_OCTETS, @@ -85,7 +85,7 @@ impl RegistrationRequest { self.encode_command_flag(buffer); - buffer.put_u8(UdpDatagram::PROTOCOL_NUMBER); + buffer.put_u8(UdpMessage::PROTOCOL_NUMBER); buffer.put_u64(self.isd_asn.as_u64()); encode_address(buffer, &self.public_address); diff --git a/crates/scion/Cargo.toml b/crates/scion/Cargo.toml index 95157c5..dd321fd 100644 --- a/crates/scion/Cargo.toml +++ b/crates/scion/Cargo.toml @@ -6,6 +6,7 @@ license = "Apache-2.0" publish = false [dependencies] +async-trait = "0.1.74" bytes = "1.5.0" chrono = { workspace = true, features = ["clock"] } scion-grpc = { version = "0.1.0", path = "../scion-grpc" } diff --git a/crates/scion/src/lib.rs b/crates/scion/src/lib.rs index 4a4ca1d..2a6ea0e 100644 --- a/crates/scion/src/lib.rs +++ b/crates/scion/src/lib.rs @@ -2,4 +2,5 @@ pub mod daemon; pub mod dispatcher; +pub mod pan; pub mod udp_socket; diff --git a/crates/scion/src/pan.rs b/crates/scion/src/pan.rs new file mode 100644 index 0000000..925e5f6 --- /dev/null +++ b/crates/scion/src/pan.rs @@ -0,0 +1,9 @@ +//! Path aware networking socket and services. +mod datagram; +pub use datagram::{AsyncScionDatagram, PathAwareDatagram}; + +mod path_service; +pub use path_service::AsyncPathService; + +mod error; +pub use error::{PathErrorKind, ReceiveError, SendError}; diff --git a/crates/scion/src/pan/datagram.rs b/crates/scion/src/pan/datagram.rs new file mode 100644 index 0000000..a3d2a08 --- /dev/null +++ b/crates/scion/src/pan/datagram.rs @@ -0,0 +1,199 @@ +use std::{io, sync::Arc}; + +use async_trait; +use bytes::Bytes; +use scion_proto::{address::IsdAsn, path::Path}; + +use super::{AsyncPathService, ReceiveError, SendError}; + +/// Interface for sending and receiving datagrams asynchronously on the SCION network. +#[async_trait::async_trait] +pub trait AsyncScionDatagram { + /// The type of the address used for sending and receiving datagrams. + type Addr: AsRef + Sync + Send; + + /// Receive a datagram, its sender, and path from the socket. + /// + /// The payload of the datagram is written into the provided buffer, which must not be + /// empty. If there is insufficient space in the buffer, excess data may be dropped. + /// + /// This function returns the number of bytes in the payload (irrespective of whether any + /// were dropped), the address of the sender, and the SCION [`Path`] over which the packet + /// was received. + /// + /// The returned path corresponds to the reversed path observed in the packet for known path + /// types, or a copy of the opaque path data for unknown path types. In either case, the raw + /// data comprising the returned path is written to path_buffer, which must be at least + /// [`DataplanePath::MAX_LEN`][`scion_proto::path::DataplanePath::::MAX_LEN`] bytes in + /// length. + async fn recv_from_with_path<'p>( + &self, + buffer: &mut [u8], + path_buffer: &'p mut [u8], + ) -> Result<(usize, Self::Addr, Path<&'p mut [u8]>), ReceiveError>; + + /// Receive a datagram and its sender. + /// + /// This behaves like [`Self::recv_from_with_path`] but does not return the path over which + /// the packet was received. + /// + /// In the case where the path is not needed, this method should be used as the + /// implementation may avoid copying the path. + /// + /// See [`Self::recv_from_with_path`] for more information. + async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, Self::Addr), ReceiveError>; + + /// Receive a datagram and its path from the socket. + /// + /// Similar to [`Self::recv_from_with_path`], this receives the datagram into the provided + /// buffer. However, as this does not return any information about the sender, this is + /// primarily used where the sender is already known, such as with connected sockets. + /// + /// See [`Self::recv_from_with_path`] for more information. + async fn recv_with_path<'p>( + &self, + buffer: &mut [u8], + path_buffer: &'p mut [u8], + ) -> Result<(usize, Path<&'p mut [u8]>), ReceiveError> { + let (len, _, path) = self.recv_from_with_path(buffer, path_buffer).await?; + Ok((len, path)) + } + + /// Receive a datagram. + /// + /// Similar to [`Self::recv_from_with_path`], this receives the datagram into the provided + /// buffer. However, as this does not return any information about the sender, this is + /// primarily used where the sender is already known, such as with connected sockets. + /// + /// In the case where neither the path nor the sender is needed, this method should be used + /// instead of [`Self::recv_with_path`] as the implementation may avoid copying the path. + /// + /// See [`Self::recv_from_with_path`] for more information. + async fn recv(&self, buffer: &mut [u8]) -> Result { + let (len, _) = self.recv_from(buffer).await?; + Ok(len) + } + + /// Sends the payload to the specified remote address and using the specified path. + async fn send_to_via( + &self, + payload: Bytes, + destination: Self::Addr, + path: &Path, + ) -> Result<(), SendError>; + + /// Sends the payload using the specified path. + /// + /// This assumes that the underlying socket is aware of the destination, and will + /// return an error if the socket cannot identify the destination. + async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), SendError>; + + /// Returns the remote address of the socket, if any. + fn remote_addr(&self) -> Option; +} + +/// A SCION path-aware socket. +/// +/// This socket wraps an [`AsyncScionDatagram`] and [`AsyncPathService`] and handles providing +/// paths to the socket from the path service and notifying the path service of any observed +/// paths from the network as well as any issues related to the paths. +pub struct PathAwareDatagram { + socket: D, + path_service: Arc

, +} + +impl PathAwareDatagram +where + D: AsyncScionDatagram + Send + Sync, + P: AsyncPathService + Send + Sync, +{ + /// Creates a new `PathAwareDatagram` socket that wraps the provided socket and path service. + pub fn new(socket: D, path_service: Arc

) -> Self { + Self { + socket, + path_service, + } + } + + /// Changes the path service associated with this socket. + pub fn set_path_service(&mut self, path_service: Arc

) { + self.path_service = path_service; + } + + /// Returns the path service associated with this socket. + pub fn path_service(&self) -> &P { + &self.path_service + } + + /// Send a datagram using [`AsyncScionDatagram::send_to_via`] with a path from the path service. + pub async fn send_to( + &self, + payload: Bytes, + destination: ::Addr, + ) -> Result<(), SendError> { + let path = self.path_to(*destination.as_ref()).await?; + self.send_to_via(payload, destination, path).await + } + + /// Send a datagram using [`AsyncScionDatagram::send_via`] with a path from the path service. + pub async fn send(&self, payload: Bytes) -> Result<(), SendError> { + if let Some(remote_addr) = self.remote_addr() { + // Use send_via here as it maintains the connected semantics of the function call. + let path = self.path_to(*remote_addr.as_ref()).await?; + self.send_via(payload, path).await + } else { + Err(SendError::Io(io::ErrorKind::NotConnected.into())) + } + } + + async fn path_to(&self, remote_ia: IsdAsn) -> Result<&Path, SendError> { + self.path_service + .path_to(remote_ia) + .await + .map_err(|_err| todo!("handle path failures")) + } +} + +#[async_trait::async_trait] +impl AsyncScionDatagram for PathAwareDatagram +where + D: AsyncScionDatagram + Send + Sync, + P: AsyncPathService + Send + Sync, +{ + type Addr = ::Addr; + + async fn recv_from_with_path<'p>( + &self, + buffer: &mut [u8], + path_buffer: &'p mut [u8], + ) -> Result<(usize, Self::Addr, Path<&'p mut [u8]>), ReceiveError> { + self.socket.recv_from_with_path(buffer, path_buffer).await + } + + async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, Self::Addr), ReceiveError> { + self.socket.recv_from(buffer).await + } + + async fn send_to_via( + &self, + payload: Bytes, + destination: Self::Addr, + path: &Path, + ) -> Result<(), SendError> { + self.socket.send_to_via(payload, destination, path).await + } + + async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), SendError> { + self.socket.send_via(payload, path).await + } + + fn remote_addr(&self) -> Option { + self.socket.remote_addr() + } +} + +impl AsRef for PathAwareDatagram { + fn as_ref(&self) -> &D { + &self.socket + } +} diff --git a/crates/scion/src/pan/error.rs b/crates/scion/src/pan/error.rs new file mode 100644 index 0000000..7232679 --- /dev/null +++ b/crates/scion/src/pan/error.rs @@ -0,0 +1,85 @@ +use std::{fmt::Display, io}; + +use scion_proto::packet::{self, EncodeError}; + +use crate::dispatcher; + +/// Kinds of path-related failures that may occur when sending a packet. +#[derive(Debug)] +pub enum PathErrorKind { + /// The provided path has already expired. + Expired, + /// No path to the destination is available. + NoPath, + /// The path should have provided the next-hop in the underlay but did not. + /// + /// Currently, only intra-AS paths do not require a next-hop, all inter-AS paths do. + NoUnderlayNextHop, +} + +impl Display for PathErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let description = match self { + PathErrorKind::Expired => "the provided path has already expired", + PathErrorKind::NoPath => "no path to the destination available", + PathErrorKind::NoUnderlayNextHop => "no underlay next hop provided by path", + }; + f.write_str(description) + } +} + +/// Error returned when attempting to send a datagram on the SCION network. +#[derive(Debug, thiserror::Error)] +pub enum SendError { + /// An IO error raised from the OS or from the socket. + #[error(transparent)] + Io(#[from] std::io::Error), + /// An issue with the provided or fetched path. + #[error("issue with the provided path: {0}")] + PathIssue(PathErrorKind), + /// The packet is too large to be sent on the network. + #[error("packet is too large to be sent")] + PacketTooLarge, +} + +impl From for SendError { + fn from(value: PathErrorKind) -> Self { + SendError::PathIssue(value) + } +} + +impl From for SendError { + fn from(value: io::ErrorKind) -> Self { + Self::Io(value.into()) + } +} + +impl From for SendError { + fn from(value: dispatcher::SendError) -> Self { + match value { + dispatcher::SendError::Io(io) => Self::Io(io), + dispatcher::SendError::PayloadTooLarge(_) => Self::PacketTooLarge, + } + } +} + +impl From for SendError { + fn from(value: packet::EncodeError) -> Self { + match value { + EncodeError::PayloadTooLarge | EncodeError::HeaderTooLarge => Self::PacketTooLarge, + } + } +} + +/// Error messages returned from the UDP socket. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum ReceiveError { + /// A buffer with zero-length was provided to which to be written. + /// + /// Retry the operation with a buffer of non-zero length to receive datagrams. + #[error("attempted to receive with a zero-length buffer")] + ZeroLengthBuffer, + /// The provided path buffer does not meet the minimum length requirement. + #[error("path buffer too short")] + PathBufferTooShort, +} diff --git a/crates/scion/src/pan/path_service.rs b/crates/scion/src/pan/path_service.rs new file mode 100644 index 0000000..578b9cb --- /dev/null +++ b/crates/scion/src/pan/path_service.rs @@ -0,0 +1,37 @@ +use bytes::Bytes; +use chrono::Utc; +use scion_proto::{address::IsdAsn, path::Path}; + +#[derive(Debug, thiserror::Error)] +pub enum PathLookupError { + #[error("no path available to destination")] + NoPath, +} + +/// Trait for asynchronously retrieving paths to SCION ASes. +#[async_trait::async_trait] +pub trait AsyncPathService { + /// Return a path to the specified AS. + async fn path_to<'a>(&'a self, scion_as: IsdAsn) -> Result<&'a Path, PathLookupError>; +} + +#[async_trait::async_trait] +impl AsyncPathService for Path { + /// Return a path to the specified AS. + async fn path_to(&self, scion_as: IsdAsn) -> Result<&Path, PathLookupError> { + if self.isd_asn.destination != scion_as { + return Err(PathLookupError::NoPath); + } + if let Some(metadata) = self.metadata.as_ref() { + if metadata.expiration < Utc::now() { + tracing::warn!( + destination=%scion_as, + path=?self, + "attempted to send packet with expired, static path" + ); + return Err(PathLookupError::NoPath); + } + } + Ok(self) + } +} diff --git a/crates/scion/src/udp_socket.rs b/crates/scion/src/udp_socket.rs index 7d9d4ed..d3dd03e 100644 --- a/crates/scion/src/udp_socket.rs +++ b/crates/scion/src/udp_socket.rs @@ -1,7 +1,4 @@ -#![allow(missing_docs)] - //! A socket to send UDP datagrams via SCION. - use std::{ cmp, io, @@ -12,51 +9,30 @@ use bytes::Bytes; use chrono::Utc; use scion_proto::{ address::SocketAddr, - datagram::{UdpDatagram, UdpEncodeError}, - packet::{self, ByEndpoint, EncodeError, ScionPacketRaw, ScionPacketUdp}, - path::{DataplanePath, Path, UnsupportedPathType}, + datagram::{UdpEncodeError, UdpMessage}, + packet::{ByEndpoint, ScionPacketRaw, ScionPacketUdp}, + path::{DataplanePath, Path}, reliable::Packet, wire_encoding::WireDecode, }; use tokio::sync::Mutex; -use crate::dispatcher::{self, get_dispatcher_path, DispatcherStream, RegistrationError}; +use crate::{ + dispatcher::{self, get_dispatcher_path, DispatcherStream, RegistrationError}, + pan::{AsyncScionDatagram, PathErrorKind, ReceiveError, SendError}, +}; -#[allow(missing_docs)] +/// Errors that may be raised when attempted to bind a [`UdpSocket`]. #[derive(Debug, thiserror::Error)] pub enum BindError { + /// The UdpSocket was unable to connect to the dispatcher at the provided address. #[error("failed to connect to the dispatcher, reason: {0}")] DispatcherConnectFailed(#[from] io::Error), + /// An error which occurred during the registration handshake with the SCION dispatcher. #[error("failed to bind to the requested port")] RegistrationFailed(#[from] RegistrationError), } -#[allow(missing_docs)] -#[derive(Debug, thiserror::Error)] -pub enum SendError { - #[error(transparent)] - Io(#[from] std::io::Error), - #[error("packet is too large to be sent")] - PacketTooLarge, - #[error("path is expired")] - PathExpired, - #[error("remote address is not set")] - NotConnected, - #[error("path is not set")] - NoPath, - #[error("no underlay next hop provided by path")] - NoUnderlayNextHop, -} - -impl From for SendError { - fn from(value: dispatcher::SendError) -> Self { - match value { - dispatcher::SendError::Io(io) => Self::Io(io), - dispatcher::SendError::PayloadTooLarge(_) => Self::PacketTooLarge, - } - } -} - impl From for SendError { fn from(value: UdpEncodeError) -> Self { match value { @@ -65,28 +41,44 @@ impl From for SendError { } } -impl From for SendError { - fn from(value: packet::EncodeError) -> Self { - match value { - EncodeError::PayloadTooLarge | EncodeError::HeaderTooLarge => Self::PacketTooLarge, - } - } -} - +/// A SCION UDP socket. +/// +/// After creating a `UdpSocket` by binding it to a SCION socket address, data can +/// be [sent to][AsyncScionDatagram::send_to_via] and [received from][AsyncScionDatagram::recv_from] +/// any other socket address by using the methods on the [`AsyncScionDatagram`] trait. +/// +/// As SCION is a path-aware Internet architecture, sending packets with the `UdpSocket` allows +/// specifying the path over which the packet should be sent. See +/// [`PathAwareDatagram`][crate::pan::PathAwareDatagram] for a wrapping socket than handles +/// the selection of paths. +/// +/// Although UDP is a connectionless protocol, this implementation provides an interface to set an +/// address where data should be sent and received from. After setting a remote address with +/// [`connect`][UdpSocket::connect], data can be sent to and received from that address with the +/// [`send_via`][AsyncScionDatagram::send_via] and [`recv`][AsyncScionDatagram::recv] methods. #[derive(Debug)] pub struct UdpSocket { inner: Arc, } impl UdpSocket { + /// Creates a new UDP socket bound to the provided SCION socket address. pub async fn bind(address: SocketAddr) -> Result { Self::bind_with_dispatcher(address, get_dispatcher_path()).await } - pub async fn bind_with_dispatcher + std::fmt::Debug>( + /// Creates a new UDP socket from the given SCION socket address, by connecting to + /// and registering with the SCION dispatcher at the specified path. + /// + /// See [`bind`][Self::bind] for a variant that connects to the system's configured + /// SCION dispatcher . + pub async fn bind_with_dispatcher

( address: SocketAddr, dispatcher_path: P, - ) -> Result { + ) -> Result + where + P: AsRef + std::fmt::Debug, + { let mut stream = DispatcherStream::connect(dispatcher_path).await?; let local_address = stream.register(address).await?; @@ -104,148 +96,67 @@ impl UdpSocket { self.inner.local_addr() } - /// Receive a SCION UDP packet. - /// - /// The UDP payload is written into the provided buffer. If there is insufficient space, excess - /// data is dropped. The returned number of bytes always refers to the amount of data in the UDP - /// payload. - pub async fn recv(&self, buffer: &mut [u8]) -> Result { - let (packet_len, _) = self.recv_from(buffer).await?; - Ok(packet_len) + /// Registers a remote address for this socket. + pub fn connect(&self, remote_address: SocketAddr) { + self.inner.set_remote_address(Some(remote_address)); } - /// Receive a SCION UDP packet from a remote endpoint. - /// - /// This behaves like [`Self::recv`] but additionally returns the remote SCION socket address. - pub async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), ReceiveError> { - self.inner - .recv_loop(buffer, None) - .await - .map(|(len, addr, _)| (len, addr)) + /// Clears the association, if any, with the remote address. + pub fn disconnect(&self) { + self.inner.set_remote_address(None); } +} - /// Receive a SCION UDP packet from a remote endpoint with path information. - /// - /// This behaves like [`Self::recv`] but additionally returns - /// - the remote SCION socket address and - /// - the path over which the packet was received. For supported path types, this path is - /// already reversed such that it can be used directly to send reply packets; for unsupported - /// path types, the path is copied unmodified. - pub async fn recv_from_with_path( +#[async_trait::async_trait] +impl AsyncScionDatagram for UdpSocket { + /// The type of the address used for sending and receiving datagrams. + type Addr = SocketAddr; + + async fn recv_from_with_path<'p>( &self, buffer: &mut [u8], - ) -> Result<(usize, SocketAddr, Path), ReceiveError> { + path_buffer: &'p mut [u8], + ) -> Result<(usize, Self::Addr, Path<&'p mut [u8]>), ReceiveError> { if buffer.is_empty() { return Err(ReceiveError::ZeroLengthBuffer); } - if buffer.len() < DataplanePath::::MAX_LEN { + if path_buffer.len() < DataplanePath::::MAX_LEN { return Err(ReceiveError::PathBufferTooShort); } - // TODO(jsmith): Refactor to accept two buffers and return a Path referring into one. - let split_point = buffer.len() - DataplanePath::::MAX_LEN; - let (buffer, path_buf) = buffer.split_at_mut(split_point); - - let (packet_len, sender, path) = self.inner.recv_loop(buffer, Some(path_buf)).await?; - let Path { - dataplane_path, - underlay_next_hop, - isd_asn, - .. - } = path.expect("non-None path since path_buf was provided"); - - // Explicit match here in case we add other errors to the `reverse` method at some point - let dataplane_path = match dataplane_path.to_reversed() { - Ok(reversed_dataplane) => reversed_dataplane, - Err(UnsupportedPathType(_)) => dataplane_path.into(), + let (len, sender, Some(path)) = self.inner.recv_loop(buffer, Some(path_buffer)).await? + else { + unreachable!("path is always returned when providing a buffer") }; - - Ok(( - packet_len, - sender, - Path::new(dataplane_path, isd_asn.into_reversed(), underlay_next_hop), - )) - } - - /// Receive a SCION UDP packet with path information. - /// - /// This behaves like [`Self::recv`] but additionally returns the path over which the packet was - /// received. For supported path types, this path is already reversed such that it can be used - /// directly to send reply packets; for unsupported path types, the path is copied unmodified. - pub async fn recv_with_path(&self, buffer: &mut [u8]) -> Result<(usize, Path), ReceiveError> { - let (packet_len, _, path) = self.recv_from_with_path(buffer).await?; - Ok((packet_len, path)) - } - - /// Returns the remote SCION address set for this socket, if any. - pub fn remote_addr(&self) -> Option { - self.inner.remote_addr() - } - - /// Returns the SCION path set for this socket, if any. - pub fn path(&self) -> Option { - self.inner.path() - } - - /// Registers a remote address for this socket. - pub fn connect(&self, remote_address: SocketAddr) { - self.inner.set_remote_address(Some(remote_address)); - } - - /// Clears the association, if any, with the remote address. - pub fn disconnect(&self) { - self.inner.set_remote_address(None); - } - - /// Registers or clears a path for this socket. - pub fn set_path(&self, path: Option) -> &Self { - self.inner.set_path(path); - self - } - - /// Sends the payload using the registered remote address and path - /// - /// Returns an error if the remote address or path are unset - pub async fn send(&self, payload: Bytes) -> Result<(), SendError> { - self.inner.send_to_via(payload, None, None).await + Ok((len, sender, path)) } - /// Sends the payload to the specified destination using the registered path - /// - /// Returns an error if the path is unset - pub async fn send_to(&self, payload: Bytes, destination: SocketAddr) -> Result<(), SendError> { + async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, Self::Addr), ReceiveError> { self.inner - .send_to_via(payload, Some(destination), None) + .recv_loop(buffer, None) .await + .map(|(len, addr, _)| (len, addr)) } - /// Sends the payload to the registered destination using the specified path - /// - /// Returns an error if the remote address is unset - pub async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), SendError> { - self.inner.send_to_via(payload, None, Some(path)).await - } - - /// Sends the payload to the specified remote address and path - pub async fn send_to_via( + async fn send_to_via( &self, payload: Bytes, - destination: SocketAddr, + destination: Self::Addr, path: &Path, ) -> Result<(), SendError> { self.inner - .send_to_via(payload, Some(destination), Some(path)) + .send_to_via(payload, Some(destination), path) .await } -} -/// Error messages returned from the UDP socket. -#[derive(Debug, thiserror::Error, PartialEq, Eq)] -pub enum ReceiveError { - #[error("attempted to receive with a zero-length buffer")] - ZeroLengthBuffer, - #[error("path buffer too short")] - PathBufferTooShort, + async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), SendError> { + self.inner.send_to_via(payload, None, path).await + } + + /// Returns the remote address of the socket, if any. + fn remote_addr(&self) -> Option { + self.inner.remote_addr() + } } macro_rules! log_err { @@ -269,7 +180,6 @@ impl UdpSocketInner { state: RwLock::new(Arc::new(State { local_address, remote_address: None, - path: None, })), stream: Mutex::new(stream), } @@ -279,17 +189,16 @@ impl UdpSocketInner { &self, payload: Bytes, destination: Option, - path: Option<&Path>, + path: &Path, ) -> Result<(), SendError> { let state = self.state.read().unwrap().clone(); - let path = path.or(state.path.as_ref()).ok_or(SendError::NoPath)?; let Some(destination) = destination.or(state.remote_address) else { - return Err(SendError::NotConnected); + return Err(io::ErrorKind::NotConnected.into()); }; if let Some(metadata) = &path.metadata { if metadata.expiration < Utc::now() { - return Err(SendError::PathExpired); + return Err(PathErrorKind::Expired.into()); } } @@ -301,7 +210,7 @@ impl UdpSocketInner { socket_addr }) } else { - return Err(SendError::NoUnderlayNextHop); + return Err(PathErrorKind::NoUnderlayNextHop.into()); }; let packet = ScionPacketUdp::new( @@ -351,10 +260,15 @@ impl UdpSocketInner { { if let Some(path_buf) = path_buf { let path_len = path.dataplane_path.raw().len(); - let dataplane_path = - path.dataplane_path.copy_to_slice(&mut path_buf[..path_len]); - let path = - Path::new(dataplane_path, path.isd_asn, path.underlay_next_hop); + let dataplane_path = path + .dataplane_path + .reverse_to_slice(&mut path_buf[..path_len]); + + let path = Path::new( + dataplane_path, + path.isd_asn.into_reversed(), + path.underlay_next_hop, + ); return Ok((packet_len, sender, Some(path))); } else { return Ok((packet_len, sender, None)); @@ -384,7 +298,7 @@ impl UdpSocketInner { .map_err(log_err!("failed to decode SCION packet")) .ok()?; - let udp_datagram = UdpDatagram::decode(&mut scion_packet.payload) + let udp_datagram = UdpMessage::decode(&mut scion_packet.payload) .map_err(log_err!("failed to decode UDP datagram")) .ok()?; @@ -433,27 +347,18 @@ impl UdpSocketInner { pub fn set_remote_address(&self, remote_address: Option) { Arc::make_mut(&mut *self.state.write().unwrap()).remote_address = remote_address; } - - pub fn path(&self) -> Option { - self.state.read().unwrap().path.clone() - } - - pub fn set_path(&self, path: Option) { - Arc::make_mut(&mut *self.state.write().unwrap()).path = path; - } } #[derive(Debug, Clone)] struct State { local_address: SocketAddr, remote_address: Option, - path: Option, } #[cfg(test)] mod tests { use scion_proto::path::DataplanePath; - use tokio::{net::UnixStream, sync::Notify}; + use tokio::net::UnixStream; use super::*; @@ -510,6 +415,22 @@ mod tests { Ok(()) } + + pub async fn recv_from_helper<'p>( + socket: &UdpSocket, + buffer: &mut [u8], + path_buffer: &'p mut [u8], + use_from: bool, + ) -> Result<(usize, Option, Path<&'p mut [u8]>), ReceiveError> { + if use_from { + let (len, remote_addr, path) = + socket.recv_from_with_path(buffer, path_buffer).await?; + Ok((len, Some(remote_addr), path)) + } else { + let (len, path) = socket.recv_with_path(buffer, path_buffer).await?; + Ok((len, None, path)) + } + } } macro_rules! async_test_case { @@ -588,12 +509,11 @@ mod tests { .await .expect_err("should fail on unconnected socket"); - assert!( - matches!(err, SendError::NotConnected), - "expected {:?}, got {:?}", - SendError::NotConnected, - err - ); + if let SendError::Io(io_err) = err { + assert_eq!(io_err.kind(), io::ErrorKind::NotConnected); + } else { + panic!("expected Io(ErrorKind::NotConnected), got {}", err); + } Ok(()) } @@ -629,23 +549,25 @@ mod tests { }; assert_eq!(endpoints.source.isd_asn(), endpoints.destination.isd_asn()); - let mut buffer = vec![0u8; 1500]; + let mut buffer = vec![0u8; 64]; + let mut path_buffer = vec![0u8; 1024]; let (socket, mut dispatcher) = utils::socket_from(endpoints.source)?; utils::local_send_raw(&mut dispatcher, endpoints, MESSAGE).await?; - let (length, incoming_remote_addr, incoming_path) = if use_from { - socket.recv_from_with_path(&mut buffer).await? - } else { - let res = socket.recv_with_path(&mut buffer).await?; - (res.0, endpoints.source, res.1) - }; + let (length, incoming_remote_addr, incoming_path) = + utils::recv_from_helper(&socket, &mut buffer, &mut path_buffer, use_from).await?; assert_eq!(&buffer[..length], MESSAGE); - assert_eq!(incoming_remote_addr, endpoints.source); - assert_eq!(incoming_path.dataplane_path, DataplanePath::EmptyPath); + assert_eq!( + incoming_path.dataplane_path, + DataplanePath::::EmptyPath + ); assert_eq!(incoming_path.isd_asn, endpoints.map(SocketAddr::isd_asn)); assert_eq!(incoming_path.metadata, None); assert_ne!(incoming_path.underlay_next_hop, None); + if let Some(incoming_remote_addr) = incoming_remote_addr { + assert_eq!(incoming_remote_addr, endpoints.source); + } Ok(()) } @@ -673,7 +595,8 @@ mod tests { }; assert_eq!(other_endpoints.source.isd_asn(), endpoints.source.isd_asn()); - let mut buffer = vec![0u8; 1500]; + let mut buffer = vec![0u8; 64]; + let mut path_buffer = vec![0u8; 1024]; let (socket, mut dispatcher) = utils::socket_from(endpoints.source)?; // Write packets to be received @@ -688,30 +611,26 @@ mod tests { // Connect to the remote source socket.connect(endpoints.source); - let length = if use_from { - let (length, remote_addr, _) = socket.recv_from_with_path(&mut buffer).await?; - assert_eq!(remote_addr, endpoints.source); - length - } else { - socket.recv_with_path(&mut buffer).await?.0 - }; + let (length, remote_addr, _) = + utils::recv_from_helper(&socket, &mut buffer, &mut path_buffer, use_from).await?; // The first packet received is the second packet written. assert_eq!(&buffer[..length], messages[1]); + if let Some(remote_addr) = remote_addr { + assert_eq!(remote_addr, endpoints.source); + } // Disconnect the association to receive packets with other addresses socket.disconnect(); - let length = if use_from { - let (length, remote_addr, _) = socket.recv_from_with_path(&mut buffer).await?; - assert_eq!(remote_addr, other_endpoints.source); - length - } else { - socket.recv_with_path(&mut buffer).await?.0 - }; + let (length, remote_addr, _) = + utils::recv_from_helper(&socket, &mut buffer, &mut path_buffer, use_from).await?; // The second packet packet received is the third packet written. assert_eq!(&buffer[..length], messages[2]); + if let Some(remote_addr) = remote_addr { + assert_eq!(remote_addr, other_endpoints.source); + } Ok(()) } @@ -722,13 +641,16 @@ mod tests { pub const USE_FROM: bool = true; async_test_case! { - connected: - test_connected_recv( - "[1-f:0:3,4.4.0.1]:80", "[1-f:0:3,11.10.13.7]:443", "[1-f:0:3,10.20.30.40]:981", USE_FROM - ) + connected: test_connected_recv( + "[1-f:0:3,4.4.0.1]:80", + "[1-f:0:3,11.10.13.7]:443", + "[1-f:0:3,10.20.30.40]:981", + USE_FROM + ) } async_test_case! { - unconnected: test_unconnected_recv("[1-f:0:3,4.4.0.1]:80", "[1-f:0:3,11.10.13.7]:443", USE_FROM) + unconnected: + test_unconnected_recv("[1-f:0:3,4.4.0.1]:80", "[1-f:0:3,11.10.13.7]:443", USE_FROM) } #[tokio::test] @@ -739,18 +661,22 @@ mod tests { }; assert_eq!(endpoints.source.isd_asn(), endpoints.destination.isd_asn()); - let mut buffer = vec![0u8; 1500]; + let mut buffer = vec![0u8; 64]; + let mut path_buffer = vec![0u8; 1024]; + let (socket, mut dispatcher) = utils::socket_from(endpoints.source)?; utils::local_send_raw(&mut dispatcher, endpoints, MESSAGE).await?; let err = socket - .recv_from_with_path(&mut []) + .recv_from_with_path(&mut [], &mut path_buffer) .await .expect_err("should fail due to zero-length buffer"); assert_eq!(err, ReceiveError::ZeroLengthBuffer); // The data should still be available to read - let (length, incoming_remote_addr, _) = socket.recv_from_with_path(&mut buffer).await?; + let (length, incoming_remote_addr, _) = socket + .recv_from_with_path(&mut buffer, &mut path_buffer) + .await?; assert_eq!(&buffer[..length], MESSAGE); assert_eq!(incoming_remote_addr, endpoints.source); @@ -767,47 +693,15 @@ mod tests { async_test_case! { connected: test_connected_recv( - "[1-f:0:3,4.4.0.1]:80", "[1-f:0:3,11.10.13.7]:443", "[1-f:0:3,10.20.30.40]:981", !USE_FROM + "[1-f:0:3,4.4.0.1]:80", + "[1-f:0:3,11.10.13.7]:443", + "[1-f:0:3,10.20.30.40]:981", + !USE_FROM ) } async_test_case! { - unconnected: test_unconnected_recv("[1-f:0:3,3.3.3.3]:80", "[1-f:0:3,9.9.9.81]:443", !USE_FROM) + unconnected: + test_unconnected_recv("[1-f:0:3,3.3.3.3]:80", "[1-f:0:3,9.9.9.81]:443", !USE_FROM) } } - - #[tokio::test] - async fn set_path() -> TestResult { - let local_addr: SocketAddr = "[1-f:0:1,9.8.7.6]:80".parse()?; - let (socket, _) = utils::socket_from(local_addr)?; - let path = Path::local(local_addr.isd_asn()); - - let notify = Arc::new(Notify::new()); - let notify2 = Arc::new(Notify::new()); - - let (result1, result2) = tokio::join!( - async { - let initial = socket.path(); - socket.set_path(Some(path.clone())); - notify.notify_one(); - - notify2.notified().await; - let last_set = socket.path(); - - (initial, last_set) - }, - async { - notify.notified().await; - let first_set = socket.path(); - socket.set_path(None); - notify2.notify_one(); - - first_set - } - ); - - assert_eq!(result1, (None, None)); - assert_eq!(result2, Some(path)); - - Ok(()) - } } diff --git a/crates/scion/tests/test_pan_socket.rs b/crates/scion/tests/test_pan_socket.rs new file mode 100644 index 0000000..5dd5ec4 --- /dev/null +++ b/crates/scion/tests/test_pan_socket.rs @@ -0,0 +1,119 @@ +use std::{sync::OnceLock, time::Duration}; + +use bytes::Bytes; +use scion::{ + daemon::{get_daemon_address, DaemonClient}, + pan::{AsyncScionDatagram, PathAwareDatagram}, + udp_socket::UdpSocket, +}; +use scion_proto::{address::SocketAddr, packet::ByEndpoint, path::Path}; +use tokio::sync::Mutex; + +type TestResult = Result>; + +static MESSAGE: Bytes = Bytes::from_static(b"Hello SCION!"); +const TIMEOUT: Duration = std::time::Duration::from_secs(1); + +macro_rules! test_send_receive_reply { + ($name:ident, $source:expr, $destination:expr) => { + mod $name { + use super::*; + + // Prevent tests running simultaneously to avoid registration errors from the dispatcher + fn lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::default()) + } + + async fn get_sockets( + ) -> TestResult<(PathAwareDatagram, UdpSocket, Path)> { + let endpoints: ByEndpoint = ByEndpoint { + source: $source.parse().unwrap(), + destination: $destination.parse().unwrap(), + }; + let daemon_client_source = DaemonClient::connect(&get_daemon_address()) + .await + .expect("should be able to connect"); + let socket_source = UdpSocket::bind(endpoints.source).await?; + let socket_destination = UdpSocket::bind(endpoints.destination).await?; + + socket_source.connect(endpoints.destination); + let path_forward = daemon_client_source + .paths_to(endpoints.destination.isd_asn()) + .await? + .next() + .unwrap(); + println!("Forward path: {:?}", path_forward.dataplane_path); + + let socket_source = + PathAwareDatagram::new(socket_source, path_forward.clone().into()); + + Ok((socket_source, socket_destination, path_forward)) + } + + #[tokio::test] + #[ignore = "requires daemon and dispatcher"] + async fn message() -> TestResult { + let _lock = lock().lock().await; + + let (socket_source, socket_destination, _) = get_sockets().await?; + socket_source.send(MESSAGE.clone()).await?; + + let mut buffer = vec![0_u8; 1500]; + let (length, sender) = + tokio::time::timeout(TIMEOUT, socket_destination.recv_from(&mut buffer)) + .await??; + assert_eq!(sender, socket_source.as_ref().local_addr()); + assert_eq!(buffer[..length], MESSAGE[..]); + Ok(()) + } + + #[tokio::test] + #[ignore = "requires daemon and dispatcher"] + async fn message_and_response() -> TestResult { + let _lock = lock().lock().await; + + let (socket_source, socket_destination, path_forward) = get_sockets().await?; + socket_source.send(MESSAGE.clone()).await?; + + let mut buffer = vec![0_u8; 128]; + let mut path_buffer = vec![0_u8; 1024]; + let (length, sender, path) = tokio::time::timeout( + TIMEOUT, + socket_destination.recv_from_with_path(&mut buffer, &mut path_buffer), + ) + .await??; + assert_eq!(sender, socket_source.as_ref().local_addr()); + assert_eq!(buffer[..length], MESSAGE[..]); + + println!("Reply path: {:?}", path.dataplane_path); + let path: Path = path.into(); + let socket_destination = PathAwareDatagram::new(socket_destination, path.into()); + + socket_destination.send_to(MESSAGE.clone(), sender).await?; + + let (_, path_return) = tokio::time::timeout( + TIMEOUT, + socket_source.recv_with_path(&mut buffer, &mut path_buffer), + ) + .await??; + assert_eq!(path_return.isd_asn, path_forward.isd_asn); + assert_eq!(path_return.dataplane_path, path_forward.dataplane_path); + + Ok(()) + } + } + }; +} + +test_send_receive_reply!( + send_and_receive_up_and_down_segment, + "[1-ff00:0:111,127.0.0.17]:12345", + "[1-ff00:0:112,fd00:f00d:cafe::7f00:a]:443" +); + +test_send_receive_reply!( + send_and_receive_same_as, + "[1-ff00:0:111,127.0.0.17]:12346", + "[1-ff00:0:111,127.0.0.17]:8080" +); diff --git a/crates/scion/tests/test_udp_socket.rs b/crates/scion/tests/test_udp_socket.rs index 038d399..6017755 100644 --- a/crates/scion/tests/test_udp_socket.rs +++ b/crates/scion/tests/test_udp_socket.rs @@ -3,6 +3,7 @@ use std::{sync::OnceLock, time::Duration}; use bytes::Bytes; use scion::{ daemon::{get_daemon_address, DaemonClient}, + pan::AsyncScionDatagram, udp_socket::UdpSocket, }; use scion_proto::{address::SocketAddr, packet::ByEndpoint, path::Path}; @@ -42,7 +43,6 @@ macro_rules! test_send_receive_reply { .next() .unwrap(); println!("Forward path: {:?}", path_forward.dataplane_path); - socket_source.set_path(Some(path_forward.clone())); Ok((socket_source, socket_destination, path_forward)) } @@ -52,8 +52,10 @@ macro_rules! test_send_receive_reply { async fn message() -> TestResult { let _lock = lock().lock().await; - let (socket_source, socket_destination, ..) = get_sockets().await?; - socket_source.send(MESSAGE.clone()).await?; + let (socket_source, socket_destination, path_forward) = get_sockets().await?; + socket_source + .send_via(MESSAGE.clone(), &path_forward) + .await?; let mut buffer = vec![0_u8; 1500]; let (length, sender) = @@ -70,12 +72,15 @@ macro_rules! test_send_receive_reply { let _lock = lock().lock().await; let (socket_source, socket_destination, path_forward) = get_sockets().await?; - socket_source.send(MESSAGE.clone()).await?; + socket_source + .send_via(MESSAGE.clone(), &path_forward) + .await?; - let mut buffer = vec![0_u8; 1500]; + let mut buffer = vec![0_u8; 128]; + let mut path_buffer = vec![0_u8; 1024]; let (length, sender, path) = tokio::time::timeout( TIMEOUT, - socket_destination.recv_from_with_path(&mut buffer), + socket_destination.recv_from_with_path(&mut buffer, &mut path_buffer), ) .await??; assert_eq!(sender, socket_source.local_addr()); @@ -83,12 +88,14 @@ macro_rules! test_send_receive_reply { println!("Reply path: {:?}", path.dataplane_path); socket_destination - .send_to_via(MESSAGE.clone(), sender, &path) + .send_to_via(MESSAGE.clone(), sender, &path.into()) .await?; - let (_, path_return) = - tokio::time::timeout(TIMEOUT, socket_source.recv_with_path(&mut buffer)) - .await??; + let (_, path_return) = tokio::time::timeout( + TIMEOUT, + socket_source.recv_with_path(&mut buffer, &mut path_buffer), + ) + .await??; assert_eq!(path_return.isd_asn, path_forward.isd_asn); assert_eq!(path_return.dataplane_path, path_forward.dataplane_path);