Skip to content

Commit

Permalink
feat: add initial draft of reliable protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
jpcsmith committed Oct 4, 2023
1 parent b70a0b6 commit f5dd12d
Show file tree
Hide file tree
Showing 5 changed files with 535 additions and 1 deletion.
2 changes: 1 addition & 1 deletion crates/scion/src/address/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl ServiceAddress {

#[allow(unused)]
/// Special none service address value.
const NONE: Self = Self(0xffff);
pub(crate) const NONE: Self = Self(0xffff);
/// Flag bit indicating whether the address includes multicast
const MULTICAST_FLAG: u16 = 0x8000;

Expand Down
7 changes: 7 additions & 0 deletions crates/scion/src/reliable.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
mod common_header;
pub use common_header::DecodeError;

mod error;
pub use error::ReliableRelayError;

mod relay_protocol;
pub use relay_protocol::ReliableRelayProtocol;

mod parser;
mod registration;
mod wire_utils;

Expand Down
11 changes: 11 additions & 0 deletions crates/scion/src/reliable/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use thiserror::Error;

#[derive(Error, Debug, Eq, PartialEq)]
pub enum ReliableRelayError {
#[error("provided destination address must be specified, not 0.0.0.0 or ::0")]
DestinationUnspecified,
#[error("provided destination port mmust be specified")]
DestinationPortUnspecified,
#[error("port mismatch, requested port {requested}, received port {assigned}")]
PortMismatch { requested: u16, assigned: u16 },
}
133 changes: 133 additions & 0 deletions crates/scion/src/reliable/parser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use std::{collections::VecDeque, ops::Deref};

use bytes::{Buf, Bytes};

use super::common_header::{CommonHeader, DecodeError, DecodedHeader};

pub(super) struct StreamParser {
// INV: byte objects are always non-empty
byte_queue: VecDeque<Bytes>,
bytes_remaining: usize,
next_header: Option<DecodedHeader>,
}

impl StreamParser {
pub fn new() -> Self {
Self {
byte_queue: VecDeque::new(),
bytes_remaining: 0,
next_header: None,
}
}

pub fn append_data(&mut self, data: Bytes) {
if !data.is_empty() {
self.bytes_remaining += data.len();
self.byte_queue.push_back(data);
}
}

pub fn next_packet(&mut self) -> Result<Option<(CommonHeader, Vec<Bytes>)>, DecodeError> {
match &self.next_header {
None if self.remaining() >= CommonHeader::MIN_LENGTH => {
self.next_header = Some(CommonHeader::partial_decode(self)?);

// Recursively try to get the payload if we parsed a full common header
if self.next_header.as_ref().unwrap().is_fully_decoded() {
self.next_packet()
} else {
Ok(None)
}
}
Some(DecodedHeader::Partial(header)) if self.remaining() >= header.required_bytes() => {
self.next_header = Some(DecodedHeader::Full(header.finish_decoding(self)));
self.next_packet()
}
Some(DecodedHeader::Full(header)) if self.remaining() >= header.payload_size() => {
let header = *header;
let payload = self.get_payload(header.payload_size());

self.next_header = None;

Ok(Some((header, payload)))
}
_ => Ok(None),
}
}

fn get_payload(&mut self, payload_size: usize) -> Vec<Bytes> {
let mut result = vec![];

let mut payload_bytes_needed = payload_size;

while payload_bytes_needed > 0 {
let mut data = self.byte_queue.pop_front().expect("there must be data");

if data.len() > payload_bytes_needed {
self.byte_queue
.push_front(data.split_off(payload_bytes_needed));
}

assert!(data.len() <= payload_bytes_needed);

payload_bytes_needed -= data.len();
result.push(data);
}

result
}
}

impl Buf for StreamParser {
fn remaining(&self) -> usize {
self.bytes_remaining
}

fn chunk(&self) -> &[u8] {
self.byte_queue.front().map_or(&[], |data| data.deref())
}

fn advance(&mut self, cnt: usize) {
if cnt == 0 {
return;
}
if cnt > self.bytes_remaining {
panic!(
"cnt > self.remaining() ({} > {})",
cnt, self.bytes_remaining
);
}

let mut advance_by = cnt;
while advance_by > 0 {
let mut data = self.byte_queue.pop_front().expect("there must be data");

if data.len() > advance_by {
self.byte_queue.push_front(data.split_off(advance_by));
}
assert!(data.len() <= advance_by);

advance_by -= data.len();
}

self.bytes_remaining -= cnt;
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn has_available_multiple() {
let mut parser = StreamParser::new();

parser.append_data(Bytes::from_static(&[0, 1, 2]));
parser.append_data(Bytes::from_static(&[4, 5, 6]));

let mut buffer = [0u8; 6];
parser.copy_to_slice(&mut buffer);

assert_eq!(buffer, [0, 1, 2, 4, 5, 6]);
}
}
Loading

0 comments on commit f5dd12d

Please sign in to comment.