Skip to content

Commit

Permalink
feat: implement receive for the UDP datagram socket
Browse files Browse the repository at this point in the history
  • Loading branch information
jpcsmith authored and mlegner committed Dec 4, 2023
1 parent a41e672 commit ef6dc88
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 61 deletions.
11 changes: 11 additions & 0 deletions crates/scion-proto/src/address/socket_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ pub enum SocketAddr {
}

impl SocketAddr {
/// Creates a new SCION socket address from an ISD-AS number, SCION host, and port.
pub const fn new(isd_asn: IsdAsn, host: Host, port: u16) -> Self {
match host {
Host::Ip(ip_host) => match ip_host {
IpAddr::V4(ip) => SocketAddr::V4(SocketAddrV4::new(isd_asn, ip, port)),
IpAddr::V6(ip) => SocketAddr::V6(SocketAddrV6::new(isd_asn, ip, port)),
},
Host::Svc(service) => SocketAddr::Svc(SocketAddrSvc::new(isd_asn, service, port)),
}
}

/// Construct a new SCION socket address from an ISD-AS number and standard rust socket address.
pub const fn from_std(isd_asn: IsdAsn, address: std::net::SocketAddr) -> Self {
match address {
Expand Down
13 changes: 13 additions & 0 deletions crates/scion-proto/src/packet/path_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ pub enum DataplanePath {
Unsupported { path_type: PathType, bytes: Bytes },
}

impl DataplanePath {
pub fn deep_copy(&self) -> Self {
match self {
Self::EmptyPath => Self::EmptyPath,
Self::Standard(path) => Self::Standard(path.deep_copy()),
Self::Unsupported { path_type, bytes } => Self::Unsupported {
path_type: *path_type,
bytes: Bytes::copy_from_slice(bytes),
},
}
}
}

impl From<StandardPath> for DataplanePath {
fn from(value: StandardPath) -> Self {
DataplanePath::Standard(value)
Expand Down
20 changes: 18 additions & 2 deletions crates/scion-proto/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,27 @@ pub struct Path {
/// The raw bytes to be added as the path header to SCION dataplane packets
dataplane_path: DataplanePath,
/// The underlay address (IP + port) of the next hop; i.e., the local border router
underlay_next_hop: SocketAddr,
underlay_next_hop: Option<SocketAddr>,
/// The ISD-ASN where the path starts and ends
pub isd_asn: ByEndpoint<IsdAsn>,
/// Path metadata
metadata: Option<PathMetadata>,
}

impl Path {
pub fn new(
dataplane_path: DataplanePath,
isd_asn: ByEndpoint<IsdAsn>,
underlay_next_hop: Option<SocketAddr>,
) -> Self {
Self {
dataplane_path,
underlay_next_hop,
isd_asn,
metadata: None,
}
}

#[tracing::instrument]
pub fn try_from_grpc_with_endpoints(
mut value: daemon_grpc::Path,
Expand All @@ -56,8 +69,11 @@ impl Path {
}) => address
.parse()
.map_err(|_| PathParseError::from(PathParseErrorKind::InvalidInterface))?,
// TODO: Determine if the daemon returns paths that are strictly on the host.
// If so, this is only an error if the path is non-empty
_ => return Err(PathParseErrorKind::NoInterface.into()),
};
let underlay_next_hop = Some(underlay_next_hop);

let metadata = PathMetadata::try_from(value)
.map_err(|e| {
Expand Down Expand Up @@ -93,7 +109,7 @@ mod tests {
)
.expect("conversion should succeed");
assert_eq!(
path.underlay_next_hop,
path.underlay_next_hop.unwrap(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 42)
);
assert_eq!(
Expand Down
7 changes: 7 additions & 0 deletions crates/scion-proto/src/path/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ impl StandardPath {
pub fn meta_header(&self) -> &PathMetaHeader {
&self.meta_header
}

pub fn deep_copy(&self) -> Self {
Self {
meta_header: self.meta_header.clone(),
encoded_path: Bytes::copy_from_slice(&self.encoded_path),
}
}
}

impl WireDecode<Bytes> for StandardPath {
Expand Down
156 changes: 97 additions & 59 deletions crates/scion/src/udp/socket.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
#![allow(dead_code)]
use std::{io, sync::Arc};

use scion_proto::address::SocketAddr;
use std::{cmp, io, sync::Arc};

use scion_proto::{
address::SocketAddr,
packet::ScionPacket,
path::Path,
reliable::Packet,
wire_encoding::{MaybeEncoded, WireDecode},
};
use tokio::sync::Mutex;

use crate::{
dispatcher::{DispatcherStream, RegistrationError},
udp::datagram::UdpDatagram,
DEFAULT_DISPATCHER_PATH,
};

Expand All @@ -17,12 +24,6 @@ pub enum ConnectError {
RegistrationFailed(#[from] RegistrationError),
}

// #[derive(Debug, thiserror::Error)]
// pub enum SendError {
// #[error(transparent)]
// Io(#[from] std::io::Error),
// }

pub struct UdpSocket {
inner: Arc<UdpSocketInner>,
local_address: SocketAddr,
Expand Down Expand Up @@ -51,28 +52,36 @@ impl UdpSocket {
self.local_address
}

// pub async fn recv_from(
// &self,
// buffer: &mut [u8],
// ) -> Result<(usize, SocketAddr, Path), ReceiveError> {
// self.inner.recv_from(buffer).await
// }
/// Receive a SCION UDP packet from a remote endpoint.
///
/// The UDP payload is written into the provided buffer. If there is insufficient space, excess
/// data is dropped. The returned number of bytes always refers to the amount of data in the UDP
/// payload.
///
/// Additionally returns the remote SCION socket address, and the path over which the packet was
/// received.
pub async fn recv_from(
&self,
buffer: &mut [u8],
) -> Result<(usize, SocketAddr, Path), ReceiveError> {
self.inner.recv_from(buffer).await
}
}

// pub struct ReceiveError;
pub type ReceiveError = std::convert::Infallible;

struct UdpSocketInner {
state: Mutex<State>,
}

// macro_rules! log_err {
// ($message:expr) => {
// |err| {
// tracing::debug!(?err, $message);
// err
// }
// };
// }
macro_rules! log_err {
($message:expr) => {
|err| {
tracing::debug!(?err, $message);
err
}
};
}

impl UdpSocketInner {
fn new(stream: DispatcherStream) -> Self {
Expand All @@ -81,41 +90,70 @@ impl UdpSocketInner {
}
}

// async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr, Path), ReceiveError> {
// loop {
// let receive_result = {
// let state = &mut *self.state.lock().await;
// state.stream.receive_packet().await
// };
//
// match receive_result {
// Ok(packet) => {
// if let Some(result) = self.decode_and_copy_into(packet, buf) {
// return Ok(result);
// } else {
// continue;
// }
// }
// Err(_) => todo!("attempt reconnections to dispatcher"),
// }
// }
// }
//
// fn decode_and_copy_into(
// &self,
// mut packet: Packet,
// buf: &mut [u8],
// ) -> Option<(usize, SocketAddr, Path)> {
// let mut scion_packet = ScionPacket::decode(&mut packet.content)
// .map_err(log_err!("failed to decode SCION packet"))
// .ok()?;
//
// let udp_datagram = UdpDatagram::decode(&mut scion_packet.payload)
// .map_err(log_err!("failed to decode UDP datagram"))
// .ok()?;
//
// todo!()
// }
async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr, Path), ReceiveError> {
loop {
let receive_result = {
let state = &mut *self.state.lock().await;
state.stream.receive_packet().await
};

match receive_result {
Ok(packet) => {
if let Some(result) = self.parse_incoming(packet, buf) {
return Ok(result);
} else {
continue;
}
}
Err(_) => todo!("attempt reconnections to dispatcher"),
}
}
}

fn parse_incoming(
&self,
mut packet: Packet,
buf: &mut [u8],
) -> Option<(usize, SocketAddr, Path)> {
// TODO(jsmith): Need a representation of the packets for logging purposes
let mut scion_packet = ScionPacket::decode(&mut packet.content)
.map_err(log_err!("failed to decode SCION packet"))
.ok()?;

let udp_datagram = UdpDatagram::decode(&mut scion_packet.payload)
.map_err(log_err!("failed to decode UDP datagram"))
.ok()?;

if !udp_datagram.verify_checksum(&scion_packet.address_header) {
tracing::debug!("failed to verify packet checksum");
return None;
}

let MaybeEncoded::Decoded(source_host) = scion_packet.address_header.host.source else {
tracing::debug!("dropping packet with unsupported source address type");
return None;
};
let source = SocketAddr::new(
scion_packet.address_header.ia.source,
source_host,
udp_datagram.port.source,
);

let path = {
let dataplane_path = scion_packet.path_header.deep_copy();
Path::new(
dataplane_path,
scion_packet.address_header.ia,
packet.last_host,
)
};

let payload_len = udp_datagram.payload.len();
let copy_length = cmp::min(payload_len, buf.len());
buf.copy_from_slice(&udp_datagram.payload[..copy_length]);

Some((payload_len, source, path))
}
}

#[derive(Debug)]
Expand Down

0 comments on commit ef6dc88

Please sign in to comment.