From de12848f1f4a8bc888931b70ab3d5afc3b0678ad Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 7 Feb 2024 16:07:45 -0700 Subject: [PATCH 1/6] Initial support for draft-02 --- Cargo.lock | 1 + moq-clock/src/clock.rs | 3 - moq-pub/Cargo.toml | 1 + moq-pub/src/media.rs | 21 +-- moq-transport/src/cache/fragment.rs | 5 +- moq-transport/src/cache/segment.rs | 18 +- moq-transport/src/cache/track.rs | 170 ++----------------- moq-transport/src/coding/mod.rs | 1 - moq-transport/src/lib.rs | 1 + moq-transport/src/message/announce.rs | 5 +- moq-transport/src/message/announce_ok.rs | 9 +- moq-transport/src/message/announce_reset.rs | 5 +- moq-transport/src/message/go_away.rs | 5 +- moq-transport/src/message/mod.rs | 16 +- moq-transport/src/message/object.rs | 108 ------------ moq-transport/src/message/subscribe.rs | 31 +--- moq-transport/src/message/subscribe_error.rs | 21 ++- moq-transport/src/message/subscribe_fin.rs | 24 +-- moq-transport/src/message/subscribe_ok.rs | 6 +- moq-transport/src/message/subscribe_reset.rs | 22 ++- moq-transport/src/message/unannounce.rs | 5 +- moq-transport/src/message/unsubscribe.rs | 10 +- moq-transport/src/object/group.rs | 62 +++++++ moq-transport/src/object/mod.rs | 101 +++++++++++ moq-transport/src/object/stream.rs | 49 ++++++ moq-transport/src/object/track.rs | 60 +++++++ moq-transport/src/session/client.rs | 30 +--- moq-transport/src/session/control.rs | 10 +- moq-transport/src/session/publisher.rs | 40 +++-- moq-transport/src/session/server.rs | 29 +--- moq-transport/src/session/subscriber.rs | 125 ++++++++------ moq-transport/src/setup/client.rs | 15 +- moq-transport/src/setup/extension.rs | 84 --------- moq-transport/src/setup/mod.rs | 2 - moq-transport/src/setup/server.rs | 15 +- moq-transport/src/setup/version.rs | 3 + 36 files changed, 474 insertions(+), 639 deletions(-) delete mode 100644 moq-transport/src/message/object.rs create mode 100644 moq-transport/src/object/group.rs create mode 100644 moq-transport/src/object/mod.rs create mode 100644 moq-transport/src/object/stream.rs create mode 100644 moq-transport/src/object/track.rs delete mode 100644 moq-transport/src/setup/extension.rs diff --git a/Cargo.lock b/Cargo.lock index bf6a8069..31cb67a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,6 +1130,7 @@ name = "moq-pub" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "clap", "clap_mangen", "env_logger", diff --git a/moq-clock/src/clock.rs b/moq-clock/src/clock.rs index a72a0cff..b4ec136a 100644 --- a/moq-clock/src/clock.rs +++ b/moq-clock/src/clock.rs @@ -1,5 +1,3 @@ -use std::time; - use anyhow::Context; use moq_transport::{ cache::{fragment, segment, track}, @@ -30,7 +28,6 @@ impl Publisher { .create_segment(segment::Info { sequence: VarInt::from_u32(sequence), priority: 0, - expires: Some(time::Duration::from_secs(60)), }) .context("failed to create minute segment")?; diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index 0d70ca82..0618c498 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -21,6 +21,7 @@ quinn = "0.10" webtransport-quinn = "0.6.1" #webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } url = "2" +bytes = "1.5" # Crypto rustls = { version = "0.21", features = ["dangerous_configuration"] } diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index a651f33a..77278c05 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -42,15 +42,13 @@ impl Media { // Create the catalog track with a single segment. let mut init_track = broadcast.create_track("0.mp4")?; - let init_segment = init_track.create_segment(segment::Info { + let mut init_segment = init_track.create_segment(segment::Info { sequence: VarInt::ZERO, priority: 0, - expires: None, })?; // Create a single fragment, optionally setting the size - let mut init_fragment = init_segment.final_fragment(VarInt::ZERO)?; - + let mut init_fragment = init_segment.fragment(VarInt::ZERO, init.len())?; init_fragment.chunk(init.into())?; let mut tracks = HashMap::new(); @@ -129,10 +127,9 @@ impl Media { init_track_name: &str, moov: &mp4::MoovBox, ) -> Result<(), anyhow::Error> { - let segment = track.create_segment(segment::Info { + let mut segment = track.create_segment(segment::Info { sequence: VarInt::ZERO, priority: 0, - expires: None, })?; let mut tracks = Vec::new(); @@ -215,10 +212,11 @@ impl Media { log::info!("catalog: {}", catalog_str); // Create a single fragment for the segment. - let mut fragment = segment.final_fragment(VarInt::ZERO)?; + let data: bytes::Bytes = catalog_str.into(); + let mut fragment = segment.fragment(VarInt::ZERO, data.len())?; // Add the segment and add the fragment. - fragment.chunk(catalog_str.into())?; + fragment.chunk(data)?; Ok(()) } @@ -305,18 +303,15 @@ impl Track { .context("timestamp too large")?; // Create a new segment. - let segment = self.track.create_segment(segment::Info { + let mut segment = self.track.create_segment(segment::Info { sequence: VarInt::try_from(self.sequence).context("sequence too large")?, // Newer segments are higher priority priority: u32::MAX.checked_sub(timestamp).context("priority too large")?, - - // Delete segments after 10s. - expires: Some(time::Duration::from_secs(10)), })?; // Create a single fragment for the segment that we will keep appending. - let mut fragment = segment.final_fragment(VarInt::ZERO)?; + let mut fragment = segment.fragment(VarInt::ZERO, raw.len())?; self.sequence += 1; diff --git a/moq-transport/src/cache/fragment.rs b/moq-transport/src/cache/fragment.rs index 4e08333f..9fbcbae7 100644 --- a/moq-transport/src/cache/fragment.rs +++ b/moq-transport/src/cache/fragment.rs @@ -34,9 +34,8 @@ pub struct Info { // NOTE: These may be received out of order or with gaps. pub sequence: VarInt, - // The size of the fragment, optionally None if this is the last fragment in a segment. - // TODO enforce this size. - pub size: Option, + // The size of the fragment. + pub size: usize, } struct State { diff --git a/moq-transport/src/cache/segment.rs b/moq-transport/src/cache/segment.rs index ecd27dea..e08b1d03 100644 --- a/moq-transport/src/cache/segment.rs +++ b/moq-transport/src/cache/segment.rs @@ -8,7 +8,7 @@ //! //! The segment is closed with [CacheError::Closed] when all publishers or subscribers are dropped. use core::fmt; -use std::{ops::Deref, sync::Arc, time}; +use std::{ops::Deref, sync::Arc}; use crate::VarInt; @@ -34,9 +34,6 @@ pub struct Info { // The priority of the segment within the BROADCAST. pub priority: u32, - - // Cache the segment for at most this long. - pub expires: Option, } struct State { @@ -92,11 +89,7 @@ impl Publisher { } // Not public because it's a footgun. - pub(crate) fn push_fragment( - &mut self, - sequence: VarInt, - size: Option, - ) -> Result { + pub(crate) fn push_fragment(&mut self, sequence: VarInt, size: usize) -> Result { let (publisher, subscriber) = fragment::new(fragment::Info { sequence, size }); let mut state = self.state.lock_mut(); @@ -107,12 +100,7 @@ impl Publisher { /// Write a fragment pub fn fragment(&mut self, sequence: VarInt, size: usize) -> Result { - self.push_fragment(sequence, Some(size)) - } - - /// Write the last fragment, which means size can be unknown. - pub fn final_fragment(mut self, sequence: VarInt) -> Result { - self.push_fragment(sequence, None) + self.push_fragment(sequence, size) } /// Close the segment with an error. diff --git a/moq-transport/src/cache/track.rs b/moq-transport/src/cache/track.rs index 6d2d405d..f9448d97 100644 --- a/moq-transport/src/cache/track.rs +++ b/moq-transport/src/cache/track.rs @@ -12,12 +12,8 @@ //! //! The track is closed with [CacheError::Closed] when all publishers or subscribers are dropped. -use std::{collections::BinaryHeap, fmt, ops::Deref, sync::Arc, time}; - -use indexmap::IndexMap; - use super::{segment, CacheError, Watch}; -use crate::VarInt; +use std::{fmt, ops::Deref, sync::Arc}; /// Create a track with the given name. pub fn new(name: &str) -> (Publisher, Subscriber) { @@ -36,17 +32,10 @@ pub struct Info { pub name: String, } +#[derive(Debug)] struct State { - // Store segments in received order so subscribers can detect changes. - // The key is the segment sequence, which could have gaps. - // A None value means the segment has expired. - lookup: IndexMap>, - - // Store when segments will expire in a priority queue. - expires: BinaryHeap, - - // The number of None entries removed from the start of the lookup. - pruned: usize, + current: Option, + epoch: usize, // Set when the publisher is closed/dropped, or all subscribers are dropped. closed: Result<(), CacheError>, @@ -61,75 +50,22 @@ impl State { pub fn insert(&mut self, segment: segment::Subscriber) -> Result<(), CacheError> { self.closed.clone()?; - - let entry = match self.lookup.entry(segment.sequence) { - indexmap::map::Entry::Occupied(_entry) => return Err(CacheError::Duplicate), - indexmap::map::Entry::Vacant(entry) => entry, - }; - - if let Some(expires) = segment.expires { - self.expires.push(SegmentExpiration { - sequence: segment.sequence, - expires: time::Instant::now() + expires, - }); - } - - entry.insert(Some(segment)); - - // Expire any existing segments on insert. - // This means if you don't insert then you won't expire... but it's probably fine since the cache won't grow. - // TODO Use a timer to expire segments at the correct time instead - self.expire(); - + self.current = Some(segment); + self.epoch += 1; Ok(()) } - - // Try expiring any segments - pub fn expire(&mut self) { - let now = time::Instant::now(); - while let Some(segment) = self.expires.peek() { - if segment.expires > now { - break; - } - - // Update the entry to None while preserving the index. - match self.lookup.entry(segment.sequence) { - indexmap::map::Entry::Occupied(mut entry) => entry.insert(None), - indexmap::map::Entry::Vacant(_) => panic!("expired segment not found"), - }; - - self.expires.pop(); - } - - // Remove None entries from the start of the lookup. - while let Some((_, None)) = self.lookup.get_index(0) { - self.lookup.shift_remove_index(0); - self.pruned += 1; - } - } } impl Default for State { fn default() -> Self { Self { - lookup: Default::default(), - expires: Default::default(), - pruned: 0, + current: None, + epoch: 0, closed: Ok(()), } } } -impl fmt::Debug for State { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("State") - .field("lookup", &self.lookup) - .field("pruned", &self.pruned) - .field("closed", &self.closed) - .finish() - } -} - /// Creates new segments for a track. pub struct Publisher { state: Watch, @@ -183,12 +119,7 @@ impl fmt::Debug for Publisher { pub struct Subscriber { state: Watch, info: Arc, - - // The index of the next segment to return. - index: usize, - - // If there are multiple segments to return, we put them in here to return them in priority order. - pending: BinaryHeap, + epoch: usize, // Dropped when all subscribers are dropped. _dropped: Arc, @@ -200,8 +131,7 @@ impl Subscriber { Self { state, info, - index: 0, - pending: Default::default(), + epoch: 0, _dropped, } } @@ -212,27 +142,10 @@ impl Subscriber { let notify = { let state = self.state.lock(); - // Get our adjusted index, which could be negative if we've removed more broadcasts than read. - let mut index = self.index.saturating_sub(state.pruned); - - // Push all new segments into a priority queue. - while index < state.lookup.len() { - let (_, segment) = state.lookup.get_index(index).unwrap(); - - // Skip None values (expired segments). - // TODO These might actually be expired, so we should check the expiration time. - if let Some(segment) = segment { - self.pending.push(SegmentPriority(segment.clone())); - } - - index += 1; - } - - self.index = state.pruned + index; - - // Return the higher priority segment. - if let Some(segment) = self.pending.pop() { - return Ok(Some(segment.0)); + if self.epoch != state.epoch { + let segment = state.current.as_ref().unwrap().clone(); + self.epoch = state.epoch; + return Ok(Some(segment)); } // Otherwise check if we need to return an error. @@ -261,7 +174,7 @@ impl fmt::Debug for Subscriber { f.debug_struct("Subscriber") .field("state", &self.state) .field("info", &self.info) - .field("index", &self.index) + .field("epoch", &self.epoch) .finish() } } @@ -282,56 +195,3 @@ impl Drop for Dropped { self.state.lock_mut().close(CacheError::Closed).ok(); } } - -// Used to order segments by expiration time. -struct SegmentExpiration { - sequence: VarInt, - expires: time::Instant, -} - -impl Ord for SegmentExpiration { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // Reverse order so the earliest expiration is at the top of the heap. - other.expires.cmp(&self.expires) - } -} - -impl PartialOrd for SegmentExpiration { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for SegmentExpiration { - fn eq(&self, other: &Self) -> bool { - self.expires == other.expires - } -} - -impl Eq for SegmentExpiration {} - -// Used to order segments by priority -#[derive(Clone)] -struct SegmentPriority(pub segment::Subscriber); - -impl Ord for SegmentPriority { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // Reverse order so the highest priority is at the top of the heap. - // TODO I let CodePilot generate this code so yolo - other.0.priority.cmp(&self.0.priority) - } -} - -impl PartialOrd for SegmentPriority { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for SegmentPriority { - fn eq(&self, other: &Self) -> bool { - self.0.priority == other.0.priority - } -} - -impl Eq for SegmentPriority {} diff --git a/moq-transport/src/coding/mod.rs b/moq-transport/src/coding/mod.rs index ff57b4c2..a3ff6f78 100644 --- a/moq-transport/src/coding/mod.rs +++ b/moq-transport/src/coding/mod.rs @@ -7,5 +7,4 @@ mod varint; pub use decode::*; pub use encode::*; pub use params::*; -pub use string::*; pub use varint::*; diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 08f4485a..0ff004ee 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -11,6 +11,7 @@ mod error; pub mod cache; pub mod message; +pub mod object; pub mod session; pub mod setup; diff --git a/moq-transport/src/message/announce.rs b/moq-transport/src/message/announce.rs index 281fffa8..709339d9 100644 --- a/moq-transport/src/message/announce.rs +++ b/moq-transport/src/message/announce.rs @@ -1,7 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; /// Sent by the publisher to announce the availability of a group of tracks. #[derive(Clone, Debug)] @@ -14,14 +13,14 @@ pub struct Announce { } impl Announce { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let namespace = String::decode(r).await?; let params = Params::decode(r).await?; Ok(Self { namespace, params }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.namespace.encode(w).await?; self.params.encode(w).await?; diff --git a/moq-transport/src/message/announce_ok.rs b/moq-transport/src/message/announce_ok.rs index 300279e1..a5c47928 100644 --- a/moq-transport/src/message/announce_ok.rs +++ b/moq-transport/src/message/announce_ok.rs @@ -1,7 +1,4 @@ -use crate::{ - coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError}, - setup::Extensions, -}; +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError}; /// Sent by the subscriber to accept an Announce. #[derive(Clone, Debug)] @@ -12,12 +9,12 @@ pub struct AnnounceOk { } impl AnnounceOk { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let namespace = String::decode(r).await?; Ok(Self { namespace }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.namespace.encode(w).await } } diff --git a/moq-transport/src/message/announce_reset.rs b/moq-transport/src/message/announce_reset.rs index 24d3f817..e21886b6 100644 --- a/moq-transport/src/message/announce_reset.rs +++ b/moq-transport/src/message/announce_reset.rs @@ -1,7 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; /// Sent by the subscriber to reject an Announce. #[derive(Clone, Debug)] @@ -17,7 +16,7 @@ pub struct AnnounceError { } impl AnnounceError { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let namespace = String::decode(r).await?; let code = VarInt::decode(r).await?.try_into()?; let reason = String::decode(r).await?; @@ -29,7 +28,7 @@ impl AnnounceError { }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.namespace.encode(w).await?; VarInt::from_u32(self.code).encode(w).await?; self.reason.encode(w).await?; diff --git a/moq-transport/src/message/go_away.rs b/moq-transport/src/message/go_away.rs index 7999c9a9..c86152ae 100644 --- a/moq-transport/src/message/go_away.rs +++ b/moq-transport/src/message/go_away.rs @@ -1,7 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; /// Sent by the server to indicate that the client should connect to a different server. #[derive(Clone, Debug)] @@ -10,12 +9,12 @@ pub struct GoAway { } impl GoAway { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let url = String::decode(r).await?; Ok(Self { url }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.url.encode(w).await } } diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index d32a936f..3865dbe6 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -34,7 +34,6 @@ mod announce; mod announce_ok; mod announce_reset; mod go_away; -mod object; mod subscribe; mod subscribe_error; mod subscribe_fin; @@ -47,7 +46,6 @@ pub use announce::*; pub use announce_ok::*; pub use announce_reset::*; pub use go_away::*; -pub use object::*; pub use subscribe::*; pub use subscribe_error::*; pub use subscribe_fin::*; @@ -56,13 +54,9 @@ pub use subscribe_reset::*; pub use unannounce::*; pub use unsubscribe::*; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; use std::fmt; -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; - // Use a macro to generate the message types rather than copy-paste. // This implements a decode/encode method that uses the specified type. macro_rules! message_types { @@ -74,23 +68,23 @@ macro_rules! message_types { } impl Message { - pub async fn decode(r: &mut R, ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let t = VarInt::decode(r).await?; match t.into_inner() { $($val => { - let msg = $name::decode(r, ext).await?; + let msg = $name::decode(r).await?; Ok(Self::$name(msg)) })* _ => Err(DecodeError::InvalidMessage(t)), } } - pub async fn encode(&self, w: &mut W, ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { match self { $(Self::$name(ref m) => { VarInt::from_u32($val).encode(w).await?; - m.encode(w, ext).await + m.encode(w).await },)* } } diff --git a/moq-transport/src/message/object.rs b/moq-transport/src/message/object.rs deleted file mode 100644 index 90efa232..00000000 --- a/moq-transport/src/message/object.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::{io, time}; - -use tokio::io::AsyncReadExt; - -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use crate::setup; - -/// Sent by the publisher as the header of each data stream. -#[derive(Clone, Debug)] -pub struct Object { - // An ID for this track. - // Proposal: https://github.com/moq-wg/moq-transport/issues/209 - pub track: VarInt, - - // The sequence number within the track. - pub group: VarInt, - - // The sequence number within the group. - pub sequence: VarInt, - - // The priority, where **smaller** values are sent first. - pub priority: u32, - - // Cache the object for at most this many seconds. - // Zero means never expire. - pub expires: Option, - - /// An optional size, allowing multiple OBJECTs on the same stream. - pub size: Option, -} - -impl Object { - pub async fn decode(r: &mut R, extensions: &setup::Extensions) -> Result { - // Try reading the first byte, returning a special error if the stream naturally ended. - let typ = match r.read_u8().await { - Ok(b) => VarInt::decode_byte(b, r).await?, - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Err(DecodeError::Final), - Err(e) => return Err(e.into()), - }; - - let size_present = match typ.into_inner() { - 0 => false, - 2 => true, - _ => return Err(DecodeError::InvalidMessage(typ)), - }; - - let track = VarInt::decode(r).await?; - let group = VarInt::decode(r).await?; - let sequence = VarInt::decode(r).await?; - let priority = VarInt::decode(r).await?.try_into()?; - - let expires = match extensions.object_expires { - true => match VarInt::decode(r).await?.into_inner() { - 0 => None, - secs => Some(time::Duration::from_secs(secs)), - }, - false => None, - }; - - // The presence of the size field depends on the type. - let size = match size_present { - true => Some(VarInt::decode(r).await?), - false => None, - }; - - Ok(Self { - track, - group, - sequence, - priority, - expires, - size, - }) - } - - pub async fn encode(&self, w: &mut W, extensions: &setup::Extensions) -> Result<(), EncodeError> { - // The kind changes based on the presence of the size. - let kind = match self.size { - Some(_) => VarInt::from_u32(2), - None => VarInt::ZERO, - }; - - kind.encode(w).await?; - self.track.encode(w).await?; - self.group.encode(w).await?; - self.sequence.encode(w).await?; - VarInt::from_u32(self.priority).encode(w).await?; - - // Round up if there's any decimal points. - let expires = match self.expires { - None => 0, - Some(time::Duration::ZERO) => return Err(EncodeError::InvalidValue), // there's no way of expressing zero currently. - Some(expires) if expires.subsec_nanos() > 0 => expires.as_secs() + 1, - Some(expires) => expires.as_secs(), - }; - - if extensions.object_expires { - VarInt::try_from(expires)?.encode(w).await?; - } - - if let Some(size) = self.size { - size.encode(w).await?; - } - - Ok(()) - } -} diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index e64d5a13..05385d8b 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -1,21 +1,15 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, VarInt}; - use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, VarInt}; /// Sent by the subscriber to request all future objects for the given track. /// /// Objects will use the provided ID instead of the full track name, to save bytes. #[derive(Clone, Debug)] pub struct Subscribe { - /// An ID we choose so we can map to the track_name. - // Proposal: https://github.com/moq-wg/moq-transport/issues/209 pub id: VarInt, /// The track namespace. - /// - /// Must be None if `extensions.subscribe_split` is false. - pub namespace: Option, + pub namespace: String, /// The track name. pub name: String, @@ -31,14 +25,9 @@ pub struct Subscribe { } impl Subscribe { - pub async fn decode(r: &mut R, ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - - let namespace = match ext.subscribe_split { - true => Some(String::decode(r).await?), - false => None, - }; - + let namespace = String::decode(r).await?; let name = String::decode(r).await?; let start_group = SubscribeLocation::decode(r).await?; @@ -72,17 +61,9 @@ impl Subscribe { }) } - pub async fn encode(&self, w: &mut W, ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - - if self.namespace.is_some() != ext.subscribe_split { - panic!("namespace must be None if subscribe_split is false"); - } - - if ext.subscribe_split { - self.namespace.as_ref().unwrap().encode(w).await?; - } - + self.namespace.encode(w).await?; self.name.encode(w).await?; self.start_group.encode(w).await?; diff --git a/moq-transport/src/message/subscribe_error.rs b/moq-transport/src/message/subscribe_error.rs index 9ef4c917..3577ae5c 100644 --- a/moq-transport/src/message/subscribe_error.rs +++ b/moq-transport/src/message/subscribe_error.rs @@ -1,12 +1,9 @@ use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use crate::setup::Extensions; /// Sent by the publisher to reject a Subscribe. #[derive(Clone, Debug)] pub struct SubscribeError { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - // The ID for this subscription. pub id: VarInt, @@ -15,21 +12,31 @@ pub struct SubscribeError { // An optional, human-readable reason. pub reason: String, + + /// An optional track alias, only used when error == Retry Track Alias + pub alias: VarInt, } impl SubscribeError { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; let code = VarInt::decode(r).await?.try_into()?; let reason = String::decode(r).await?; - - Ok(Self { id, code, reason }) + let alias = VarInt::decode(r).await?; + + Ok(Self { + id, + code, + reason, + alias, + }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; VarInt::from_u32(self.code).encode(w).await?; self.reason.encode(w).await?; + self.alias.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/message/subscribe_fin.rs b/moq-transport/src/message/subscribe_fin.rs index b0709714..eb02f7d1 100644 --- a/moq-transport/src/message/subscribe_fin.rs +++ b/moq-transport/src/message/subscribe_fin.rs @@ -1,36 +1,30 @@ use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use crate::setup::Extensions; /// Sent by the publisher to cleanly terminate a Subscribe. #[derive(Clone, Debug)] pub struct SubscribeFin { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 /// The ID for this subscription. pub id: VarInt, /// The final group/object sent on this subscription. - pub final_group: VarInt, - pub final_object: VarInt, + pub group: VarInt, + pub object: VarInt, } impl SubscribeFin { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - let final_group = VarInt::decode(r).await?; - let final_object = VarInt::decode(r).await?; + let group = VarInt::decode(r).await?; + let object = VarInt::decode(r).await?; - Ok(Self { - id, - final_group, - final_object, - }) + Ok(Self { id, group, object }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - self.final_group.encode(w).await?; - self.final_object.encode(w).await?; + self.group.encode(w).await?; + self.object.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/message/subscribe_ok.rs index 11864e61..ea9efe76 100644 --- a/moq-transport/src/message/subscribe_ok.rs +++ b/moq-transport/src/message/subscribe_ok.rs @@ -1,12 +1,10 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; /// Sent by the publisher to accept a Subscribe. #[derive(Clone, Debug)] pub struct SubscribeOk { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 /// The ID for this track. pub id: VarInt, @@ -15,7 +13,7 @@ pub struct SubscribeOk { } impl SubscribeOk { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; let expires = VarInt::decode(r).await?; Ok(Self { id, expires }) @@ -23,7 +21,7 @@ impl SubscribeOk { } impl SubscribeOk { - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; self.expires.encode(w).await?; Ok(()) diff --git a/moq-transport/src/message/subscribe_reset.rs b/moq-transport/src/message/subscribe_reset.rs index e488b28e..dfedf876 100644 --- a/moq-transport/src/message/subscribe_reset.rs +++ b/moq-transport/src/message/subscribe_reset.rs @@ -1,11 +1,9 @@ use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use crate::setup::Extensions; /// Sent by the publisher to terminate a Subscribe. #[derive(Clone, Debug)] pub struct SubscribeReset { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 /// The ID for this subscription. pub id: VarInt, @@ -16,34 +14,34 @@ pub struct SubscribeReset { pub reason: String, /// The final group/object sent on this subscription. - pub final_group: VarInt, - pub final_object: VarInt, + pub group: VarInt, + pub object: VarInt, } impl SubscribeReset { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; let code = VarInt::decode(r).await?.try_into()?; let reason = String::decode(r).await?; - let final_group = VarInt::decode(r).await?; - let final_object = VarInt::decode(r).await?; + let group = VarInt::decode(r).await?; + let object = VarInt::decode(r).await?; Ok(Self { id, code, reason, - final_group, - final_object, + group, + object, }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; VarInt::from_u32(self.code).encode(w).await?; self.reason.encode(w).await?; - self.final_group.encode(w).await?; - self.final_object.encode(w).await?; + self.group.encode(w).await?; + self.object.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/message/unannounce.rs b/moq-transport/src/message/unannounce.rs index a2c2e390..e93188c2 100644 --- a/moq-transport/src/message/unannounce.rs +++ b/moq-transport/src/message/unannounce.rs @@ -1,7 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; /// Sent by the publisher to terminate an Announce. #[derive(Clone, Debug)] @@ -11,13 +10,13 @@ pub struct Unannounce { } impl Unannounce { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let namespace = String::decode(r).await?; Ok(Self { namespace }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.namespace.encode(w).await?; Ok(()) diff --git a/moq-transport/src/message/unsubscribe.rs b/moq-transport/src/message/unsubscribe.rs index 5361f594..39abc373 100644 --- a/moq-transport/src/message/unsubscribe.rs +++ b/moq-transport/src/message/unsubscribe.rs @@ -1,26 +1,22 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; /// Sent by the subscriber to terminate a Subscribe. #[derive(Clone, Debug)] pub struct Unsubscribe { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - // The ID for this subscription. pub id: VarInt, } impl Unsubscribe { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; Ok(Self { id }) } } impl Unsubscribe { - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/object/group.rs b/moq-transport/src/object/group.rs new file mode 100644 index 00000000..5b7bd43a --- /dev/null +++ b/moq-transport/src/object/group.rs @@ -0,0 +1,62 @@ +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; + +#[derive(Clone, Debug)] +pub struct GroupHeader { + // The subscribe ID for this track. + pub subscribe: VarInt, + + // Identifies the name of the track + pub track: VarInt, + + // The group sequence number + pub group: VarInt, + + // The priority, where **smaller** values are sent first. + pub priority: u32, +} + +impl GroupHeader { + pub async fn decode(r: &mut R) -> Result { + let subscribe = VarInt::decode(r).await?; + let track = VarInt::decode(r).await?; + let group = VarInt::decode(r).await?; + let priority = VarInt::decode(r).await?.try_into()?; + + Ok(Self { + subscribe, + track, + group, + priority, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.subscribe.encode(w).await?; + self.track.encode(w).await?; + self.group.encode(w).await?; + VarInt::from_u32(self.priority).encode(w).await?; + + Ok(()) + } +} + +pub struct GroupChunk { + pub object: VarInt, + pub size: VarInt, +} + +impl GroupChunk { + pub async fn decode(r: &mut R) -> Result { + let object = VarInt::decode(r).await?; + let size = VarInt::decode(r).await?; + + Ok(Self { object, size }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.object.encode(w).await?; + self.size.encode(w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/object/mod.rs b/moq-transport/src/object/mod.rs new file mode 100644 index 00000000..36e5202d --- /dev/null +++ b/moq-transport/src/object/mod.rs @@ -0,0 +1,101 @@ +mod group; +mod stream; +mod track; + +pub use group::*; +pub use stream::*; +pub use track::*; + +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; +use std::fmt; + +// Use a macro to generate the message types rather than copy-paste. +// This implements a decode/encode method that uses the specified type. +macro_rules! object_types { + {$($name:ident = $val:expr,)*} => { + /// All supported message types. + #[derive(Clone)] + pub enum Object { + $($name($name)),* + } + + impl Object { + pub async fn decode(r: &mut R) -> Result { + let t = VarInt::decode(r).await?; + + match t.into_inner() { + $($val => { + let msg = $name::decode(r).await?; + Ok(Self::$name(msg)) + })* + _ => Err(DecodeError::InvalidMessage(t)), + } + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + $(Self::$name(ref m) => { + VarInt::from_u32($val).encode(w).await?; + m.encode(w).await + },)* + } + } + + pub fn id(&self) -> VarInt { + match self { + $(Self::$name(_) => { + VarInt::from_u32($val) + },)* + } + } + + pub fn name(&self) -> &'static str { + match self { + $(Self::$name(_) => { + stringify!($name) + },)* + } + } + + pub fn subscribe(&self) -> VarInt { + match self { + $(Self::$name(o) => o.subscribe,)* + } + } + + pub fn track(&self) -> VarInt { + match self { + $(Self::$name(o) => o.track,)* + } + } + + pub fn priority(&self) -> u32 { + match self { + $(Self::$name(o) => o.priority,)* + } + } + } + + $(impl From<$name> for Object { + fn from(m: $name) -> Self { + Object::$name(m) + } + })* + + impl fmt::Debug for Object { + // Delegate to the message formatter + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + $(Self::$name(ref m) => m.fmt(f),)* + } + } + } + } +} + +// Each object type is prefixed with the given VarInt type. +object_types! { + Stream = 0x0, + GroupHeader = 0x50, + TrackHeader = 0x51, +} diff --git a/moq-transport/src/object/stream.rs b/moq-transport/src/object/stream.rs new file mode 100644 index 00000000..d1bee578 --- /dev/null +++ b/moq-transport/src/object/stream.rs @@ -0,0 +1,49 @@ +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +/// Sent by the publisher as the header of each data stream. +#[derive(Clone, Debug)] +pub struct Stream { + // The subscribe ID for this track. + pub subscribe: VarInt, + + // Identifies the name of the track + pub track: VarInt, + + // The sequence number within the track. + pub group: VarInt, + + // The sequence number within the group. + pub sequence: VarInt, + + // The priority, where **smaller** values are sent first. + pub priority: u32, +} + +impl Stream { + pub async fn decode(r: &mut R) -> Result { + let subscribe = VarInt::decode(r).await?; + let track = VarInt::decode(r).await?; + let group = VarInt::decode(r).await?; + let sequence = VarInt::decode(r).await?; + let priority = VarInt::decode(r).await?.try_into()?; + + Ok(Self { + subscribe, + track, + group, + sequence, + priority, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.subscribe.encode(w).await?; + self.track.encode(w).await?; + self.group.encode(w).await?; + self.sequence.encode(w).await?; + VarInt::from_u32(self.priority).encode(w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/object/track.rs b/moq-transport/src/object/track.rs new file mode 100644 index 00000000..66874249 --- /dev/null +++ b/moq-transport/src/object/track.rs @@ -0,0 +1,60 @@ +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +#[derive(Clone, Debug)] +pub struct TrackHeader { + // The subscribe ID for this track. + pub subscribe: VarInt, + + // Identifies the name of the track + pub track: VarInt, + + // The priority, where **smaller** values are sent first. + pub priority: u32, +} + +impl TrackHeader { + pub async fn decode(r: &mut R) -> Result { + let subscribe = VarInt::decode(r).await?; + let track = VarInt::decode(r).await?; + let priority = VarInt::decode(r).await?.try_into()?; + + Ok(Self { + subscribe, + track, + priority, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.subscribe.encode(w).await?; + self.track.encode(w).await?; + VarInt::from_u32(self.priority).encode(w).await?; + + Ok(()) + } +} + +pub struct TrackChunk { + pub group: VarInt, + pub object: VarInt, + pub size: VarInt, +} + +impl TrackChunk { + pub async fn decode(r: &mut R) -> Result { + let group = VarInt::decode(r).await?; + let object = VarInt::decode(r).await?; + let size = VarInt::decode(r).await?; + + Ok(Self { group, object, size }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.group.encode(w).await?; + self.object.encode(w).await?; + self.size.encode(w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/session/client.rs b/moq-transport/src/session/client.rs index 243fe4ef..fb99d0af 100644 --- a/moq-transport/src/session/client.rs +++ b/moq-transport/src/session/client.rs @@ -30,46 +30,22 @@ impl Client { async fn send_setup(session: &Session, role: setup::Role) -> Result { let mut control = session.open_bi().await?; - let versions: setup::Versions = [setup::Version::DRAFT_01, setup::Version::KIXEL_01].into(); + let versions: setup::Versions = [setup::Version::DRAFT_02].into(); let client = setup::Client { role, versions: versions.clone(), params: Default::default(), - - // Offer all extensions - extensions: setup::Extensions { - object_expires: true, - subscriber_id: true, - subscribe_split: true, - }, }; log::debug!("sending client SETUP: {:?}", client); client.encode(&mut control.0).await?; - let mut server = setup::Server::decode(&mut control.1).await?; + let server = setup::Server::decode(&mut control.1).await?; log::debug!("received server SETUP: {:?}", server); - match server.version { - setup::Version::DRAFT_01 => { - // We always require this extension - server.extensions.require_subscriber_id()?; - - if server.role.is_publisher() { - // We only require object expires if we're a subscriber, so we don't cache objects indefinitely. - server.extensions.require_object_expires()?; - } - } - setup::Version::KIXEL_01 => { - // KIXEL_01 didn't support extensions; all were enabled. - server.extensions = client.extensions.clone() - } - _ => return Err(SessionError::Version(versions, [server.version].into())), - } - - let control = Control::new(control.0, control.1, server.extensions); + let control = Control::new(control.0, control.1); Ok(control) } diff --git a/moq-transport/src/session/control.rs b/moq-transport/src/session/control.rs index 06866509..7c84e805 100644 --- a/moq-transport/src/session/control.rs +++ b/moq-transport/src/session/control.rs @@ -6,21 +6,19 @@ use tokio::sync::Mutex; use webtransport_quinn::{RecvStream, SendStream}; use super::SessionError; -use crate::{message::Message, setup::Extensions}; +use crate::message::Message; #[derive(Debug, Clone)] pub(crate) struct Control { send: Arc>, recv: Arc>, - pub ext: Extensions, } impl Control { - pub fn new(send: SendStream, recv: RecvStream, ext: Extensions) -> Self { + pub fn new(send: SendStream, recv: RecvStream) -> Self { Self { send: Arc::new(Mutex::new(send)), recv: Arc::new(Mutex::new(recv)), - ext, } } @@ -28,7 +26,7 @@ impl Control { let mut stream = self.send.lock().await; log::info!("sending message: {:?}", msg); msg.into() - .encode(&mut *stream, &self.ext) + .encode(&mut *stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(()) @@ -37,7 +35,7 @@ impl Control { // It's likely a mistake to call this from two different tasks, but it's easier to just support it. pub async fn recv(&self) -> Result { let mut stream = self.recv.lock().await; - let msg = Message::decode(&mut *stream, &self.ext) + let msg = Message::decode(&mut *stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(msg) diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index cf19bd36..09bce98a 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -10,7 +10,7 @@ use crate::{ cache::{broadcast, segment, track, CacheError}, message, message::Message, - MoqError, VarInt, + object, MoqError, VarInt, }; use super::{Control, SessionError}; @@ -129,20 +129,14 @@ impl Publisher { // TODO properly populate these // But first: https://github.com/moq-wg/moq-transport/issues/313 - final_group: VarInt::ZERO, - final_object: VarInt::ZERO, + group: VarInt::ZERO, + object: VarInt::ZERO, }; self.control.send(msg).await } fn start_subscribe(&mut self, msg: message::Subscribe) -> Result { - // We currently don't use the namespace field in SUBSCRIBE - // Make sure the namespace is empty if it's provided. - if msg.namespace.as_ref().map_or(false, |namespace| !namespace.is_empty()) { - return Err(CacheError::NotFound.into()); - } - let mut track = self.source.get_track(&msg.name)?; // TODO only clone the fields we need @@ -193,24 +187,28 @@ impl Publisher { let priority = (segment.priority as i64 - i32::MAX as i64) as i32; stream.set_priority(priority).ok(); - while let Some(mut fragment) = segment.fragment().await? { - log::trace!("serving fragment: {:?}", fragment); + let object = object::GroupHeader { + subscribe: id, + track: id, - let object = message::Object { - track: id, + // Properties of the segment + group: segment.sequence, + priority: segment.priority, + }; - // Properties of the segment - group: segment.sequence, - priority: segment.priority, - expires: segment.expires, + object + .encode(&mut stream) + .await + .map_err(|e| SessionError::Unknown(e.to_string()))?; - // Properties of the fragment - sequence: fragment.sequence, - size: fragment.size.map(VarInt::try_from).transpose()?, + while let Some(mut fragment) = segment.fragment().await? { + let object = object::GroupChunk { + object: fragment.sequence, + size: VarInt::try_from(fragment.size)?, }; object - .encode(&mut stream, &self.control.ext) + .encode(&mut stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; diff --git a/moq-transport/src/session/server.rs b/moq-transport/src/session/server.rs index 215fe948..08b030ef 100644 --- a/moq-transport/src/session/server.rs +++ b/moq-transport/src/session/server.rs @@ -13,32 +13,14 @@ impl Server { pub async fn accept(session: Session) -> Result { let mut control = session.accept_bi().await?; - let mut client = setup::Client::decode(&mut control.1).await?; + let client = setup::Client::decode(&mut control.1).await?; log::debug!("received client SETUP: {:?}", client); - if client.versions.contains(&setup::Version::DRAFT_01) { - // We always require subscriber ID. - client.extensions.require_subscriber_id()?; - - // We require OBJECT_EXPIRES for publishers only. - if client.role.is_publisher() { - client.extensions.require_object_expires()?; - } - - // We don't require SUBSCRIBE_SPLIT since it's easy enough to support, but it's clearly an oversight. - // client.extensions.require(&Extension::SUBSCRIBE_SPLIT)?; - } else if client.versions.contains(&setup::Version::KIXEL_01) { - // Extensions didn't exist in KIXEL_01, so we set them manually. - client.extensions = setup::Extensions { - object_expires: true, - subscriber_id: true, - subscribe_split: true, - }; - } else { + if !client.versions.contains(&setup::Version::DRAFT_02) { return Err(SessionError::Version( client.versions, - [setup::Version::DRAFT_01, setup::Version::KIXEL_01].into(), + [setup::Version::DRAFT_02].into(), )); } @@ -63,7 +45,7 @@ impl Request { let setup = self.setup(setup::Role::Publisher)?; setup.encode(&mut self.control.0).await?; - let control = Control::new(self.control.0, self.control.1, setup.extensions); + let control = Control::new(self.control.0, self.control.1); let publisher = Publisher::new(self.session, control, source); Ok(publisher) } @@ -73,7 +55,7 @@ impl Request { let setup = self.setup(setup::Role::Subscriber)?; setup.encode(&mut self.control.0).await?; - let control = Control::new(self.control.0, self.control.1, setup.extensions); + let control = Control::new(self.control.0, self.control.1); let subscriber = Subscriber::new(self.session, control, source); Ok(subscriber) } @@ -89,7 +71,6 @@ impl Request { let server = setup::Server { role, version: setup::Version::DRAFT_01, - extensions: self.client.extensions.clone(), params: Default::default(), }; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 02b5fbd4..21afe772 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -10,6 +10,7 @@ use crate::{ coding::DecodeError, message, message::Message, + object::{self, Object}, session::{Control, SessionError}, VarInt, }; @@ -106,82 +107,102 @@ impl Subscriber { async fn run_stream(self, mut stream: RecvStream) -> Result<(), SessionError> { // Decode the object on the data stream. - let mut object = message::Object::decode(&mut stream, &self.control.ext) + let header = Object::decode(&mut stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; - log::trace!("first object: {:?}", object); + match header { + Object::TrackHeader(header) => self.run_track(header, stream).await, + Object::GroupHeader(header) => self.run_group(header, stream).await, + Object::Stream(header) => self.run_object(header, stream).await, + } + } - // A new scope is needed because the async compiler is dumb - let mut segment = { - let mut subscribes = self.subscribes.lock().unwrap(); - let track = subscribes.get_mut(&object.track).ok_or(CacheError::NotFound)?; + async fn run_track(self, header: object::TrackHeader, mut stream: RecvStream) -> Result<(), SessionError> { + loop { + let chunk = match object::TrackChunk::decode(&mut stream).await { + Ok(next) => next, - track.create_segment(segment::Info { - sequence: object.group, - priority: object.priority, - expires: object.expires, - })? - }; + // No more objects + Err(DecodeError::Final) => break, - log::trace!("received segment: {:?}", segment); + // Unknown error + Err(err) => return Err(err.into()), + }; - // Create the first fragment - let mut fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?; - let mut remain = object.size.map(usize::from); + // TODO error if we get a duplicate group + let mut segment = { + let mut subscribes = self.subscribes.lock().unwrap(); + let track = subscribes.get_mut(&header.subscribe).ok_or(CacheError::NotFound)?; - loop { - if let Some(0) = remain { - // Decode the next object from the stream. - let next = match message::Object::decode(&mut stream, &self.control.ext).await { - Ok(next) => next, + track.create_segment(segment::Info { + sequence: chunk.group, + priority: header.priority, + })? + }; - // No more objects - Err(DecodeError::Final) => break, + let mut remain = chunk.size.into(); - // Unknown error - Err(err) => return Err(err.into()), - }; + // Create a new obvject. + let mut fragment = segment.push_fragment(chunk.object, remain)?; - log::trace!("next object: {:?}", object); + while remain > 0 { + let data = stream + .read_chunk(remain, true) + .await? + .ok_or(DecodeError::UnexpectedEnd)?; + remain -= data.bytes.len(); + fragment.chunk(data.bytes)?; + } + } - // NOTE: This is a custom restriction; not part of the moq-transport draft. - // We require every OBJECT to contain the same priority since prioritization is done per-stream. - // We also require every OBJECT to contain the same group so we know when the group ends, and can detect gaps. - if next.priority != object.priority && next.group != object.group { - return Err(SessionError::StreamMapping); - } + Ok(()) + } - object = next; + async fn run_group(self, header: object::GroupHeader, mut stream: RecvStream) -> Result<(), SessionError> { + let mut segment = { + let mut subscribes = self.subscribes.lock().unwrap(); + let track = subscribes.get_mut(&header.subscribe).ok_or(CacheError::NotFound)?; + + track.create_segment(segment::Info { + sequence: header.group, + priority: header.priority, + })? + }; - // Create a new object. - fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?; - remain = object.size.map(usize::from); + loop { + let chunk = match object::GroupChunk::decode(&mut stream).await { + Ok(chunk) => chunk, - log::trace!("next fragment: {:?}", fragment); - } + // No more objects + Err(DecodeError::Final) => break, - match stream.read_chunk(remain.unwrap_or(usize::MAX), true).await? { - // Unbounded object has ended - None if remain.is_none() => break, + // Unknown error + Err(err) => return Err(err.into()), + }; - // Bounded object ended early, oops. - None => return Err(DecodeError::UnexpectedEnd.into()), + let mut remain = chunk.size.into(); - // NOTE: This does not make a copy! - // Bytes are immutable and ref counted. - Some(data) => { - remain = remain.map(|r| r - data.bytes.len()); + // Create a new obvject. + let mut fragment = segment.push_fragment(chunk.object, remain)?; - log::trace!("next chunk: {:?}", data); - fragment.chunk(data.bytes)?; - } + while remain > 0 { + let data = stream + .read_chunk(remain, true) + .await? + .ok_or(DecodeError::UnexpectedEnd)?; + remain -= data.bytes.len(); + fragment.chunk(data.bytes)?; } } Ok(()) } + async fn run_object(self, _header: object::Stream, _stream: RecvStream) -> Result<(), SessionError> { + unimplemented!("TODO"); + } + async fn run_source(mut self) -> Result<(), SessionError> { loop { // NOTE: This returns Closed when the source is closed. @@ -193,7 +214,7 @@ impl Subscriber { let msg = message::Subscribe { id, - namespace: self.control.ext.subscribe_split.then(|| "".to_string()), + namespace: "".to_string(), name, // TODO correctly support these diff --git a/moq-transport/src/setup/client.rs b/moq-transport/src/setup/client.rs index a18eb7de..47782913 100644 --- a/moq-transport/src/setup/client.rs +++ b/moq-transport/src/setup/client.rs @@ -1,4 +1,4 @@ -use super::{Extensions, Role, Versions}; +use super::{Role, Versions}; use crate::{ coding::{Decode, DecodeError, Encode, EncodeError, Params}, VarInt, @@ -17,9 +17,6 @@ pub struct Client { /// Indicate if the client is a publisher, a subscriber, or both. pub role: Role, - /// A list of known/offered extensions. - pub extensions: Extensions, - /// Unknown parameters. pub params: Params, } @@ -46,14 +43,7 @@ impl Client { return Err(DecodeError::InvalidParameter); } - let extensions = Extensions::load(&mut params).await?; - - Ok(Self { - versions, - role, - extensions, - params, - }) + Ok(Self { versions, role, params }) } /// Encode a server setup message. @@ -63,7 +53,6 @@ impl Client { let mut params = self.params.clone(); params.set(VarInt::from_u32(0), self.role).await?; - self.extensions.store(&mut params).await?; params.encode(w).await?; diff --git a/moq-transport/src/setup/extension.rs b/moq-transport/src/setup/extension.rs deleted file mode 100644 index 9e8c8ccd..00000000 --- a/moq-transport/src/setup/extension.rs +++ /dev/null @@ -1,84 +0,0 @@ -use tokio::io::{AsyncRead, AsyncWrite}; - -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; -use crate::session::SessionError; -use crate::VarInt; -use paste::paste; - -/// This is a custom extension scheme to allow/require draft PRs. -/// -/// By convention, the extension number is the PR number + 0xe0000. - -macro_rules! extensions { - {$($name:ident = $val:expr,)*} => { - #[derive(Clone, Default, Debug)] - pub struct Extensions { - $( - pub $name: bool, - )* - } - - impl Extensions { - pub async fn load(params: &mut Params) -> Result { - let mut extensions = Self::default(); - - $( - if let Some(_) = params.get::(VarInt::from_u32($val)).await? { - extensions.$name = true - } - )* - - Ok(extensions) - } - - pub async fn store(&self, params: &mut Params) -> Result<(), EncodeError> { - $( - if self.$name { - params.set(VarInt::from_u32($val), ExtensionExists{}).await?; - } - )* - - Ok(()) - } - - paste! { - $( - pub fn [](&self) -> Result<(), SessionError> { - match self.$name { - true => Ok(()), - false => Err(SessionError::RequiredExtension(VarInt::from_u32($val))), - } - } - )* - } - } - } -} - -struct ExtensionExists; - -#[async_trait::async_trait] -impl Decode for ExtensionExists { - async fn decode(_r: &mut R) -> Result { - Ok(ExtensionExists {}) - } -} - -#[async_trait::async_trait] -impl Encode for ExtensionExists { - async fn encode(&self, _w: &mut W) -> Result<(), EncodeError> { - Ok(()) - } -} - -extensions! { - // required for publishers: OBJECT contains expires VarInt in seconds: https://github.com/moq-wg/moq-transport/issues/249 - // TODO write up a PR - object_expires = 0xe00f9, - - // required: SUBSCRIBE chooses track ID: https://github.com/moq-wg/moq-transport/pull/258 - subscriber_id = 0xe0102, - - // optional: SUBSCRIBE contains namespace/name tuple: https://github.com/moq-wg/moq-transport/pull/277 - subscribe_split = 0xe0115, -} diff --git a/moq-transport/src/setup/mod.rs b/moq-transport/src/setup/mod.rs index e7662e71..e5c59c84 100644 --- a/moq-transport/src/setup/mod.rs +++ b/moq-transport/src/setup/mod.rs @@ -5,13 +5,11 @@ //! Both sides negotate the [Version] and [Role]. mod client; -mod extension; mod role; mod server; mod version; pub use client::*; -pub use extension::*; pub use role::*; pub use server::*; pub use version::*; diff --git a/moq-transport/src/setup/server.rs b/moq-transport/src/setup/server.rs index 7f731195..aca99759 100644 --- a/moq-transport/src/setup/server.rs +++ b/moq-transport/src/setup/server.rs @@ -1,4 +1,4 @@ -use super::{Extensions, Role, Version}; +use super::{Role, Version}; use crate::{ coding::{Decode, DecodeError, Encode, EncodeError, Params}, VarInt, @@ -18,9 +18,6 @@ pub struct Server { // Proposal: moq-wg/moq-transport#151 pub role: Role, - /// Custom extensions. - pub extensions: Extensions, - /// Unknown parameters. pub params: Params, } @@ -46,14 +43,7 @@ impl Server { return Err(DecodeError::InvalidParameter); } - let extensions = Extensions::load(&mut params).await?; - - Ok(Self { - version, - role, - extensions, - params, - }) + Ok(Self { version, role, params }) } /// Encode the server setup. @@ -63,7 +53,6 @@ impl Server { let mut params = self.params.clone(); params.set(VarInt::from_u32(0), self.role).await?; - self.extensions.store(&mut params).await?; params.encode(w).await?; Ok(()) diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/setup/version.rs index 89330391..3a3fccf6 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/setup/version.rs @@ -15,6 +15,9 @@ impl Version { /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html pub const DRAFT_01: Version = Version(VarInt::from_u32(0xff000001)); + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html + pub const DRAFT_02: Version = Version(VarInt::from_u32(0xff000002)); + /// Fork of draft-ietf-moq-transport-00. /// /// Rough list of differences: From 0a244279685bb8aba5ac957b72f21a173e422fc6 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 7 Mar 2024 14:20:04 -0800 Subject: [PATCH 2/6] WIP --- dev/pub | 4 +-- moq-clock/src/clock.rs | 14 ++------ moq-pub/src/media.rs | 30 ++++++---------- moq-relay/src/error.rs | 12 ------- moq-relay/src/session.rs | 4 +-- moq-transport/src/cache/error.rs | 16 +++------ moq-transport/src/cache/fragment.rs | 25 ++++++++++++-- moq-transport/src/cache/segment.rs | 32 ++++++++++++----- moq-transport/src/coding/params.rs | 2 +- moq-transport/src/coding/string.rs | 6 ++-- moq-transport/src/coding/varint.rs | 11 +++--- moq-transport/src/error.rs | 5 +-- moq-transport/src/message/subscribe.rs | 25 ++++++++------ moq-transport/src/message/subscribe_fin.rs | 18 ++++++---- moq-transport/src/message/subscribe_ok.rs | 11 +++--- moq-transport/src/object/group.rs | 5 +-- moq-transport/src/object/stream.rs | 4 +-- moq-transport/src/object/track.rs | 5 +-- moq-transport/src/session/error.rs | 31 +++-------------- moq-transport/src/session/publisher.rs | 40 ++++++++++++---------- moq-transport/src/session/server.rs | 2 +- moq-transport/src/session/subscriber.rs | 39 ++++++++++++++++----- 22 files changed, 177 insertions(+), 164 deletions(-) diff --git a/dev/pub b/dev/pub index 2a5cb211..f6b36950 100755 --- a/dev/pub +++ b/dev/pub @@ -15,9 +15,9 @@ ADDR="${ADDR:-$HOST:$PORT}" # Generate a random 16 character name by default. #NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}" -# JK use the name "dev" instead +# JK use the name "bbb" instead, matching the Big Buck Bunny demo. # TODO use that random name if the host is not localhost -NAME="${NAME:-dev}" +NAME="${NAME:-bbb}" # Combine the host and name into a URL. URL="${URL:-"https://$ADDR/$NAME"}" diff --git a/moq-clock/src/clock.rs b/moq-clock/src/clock.rs index b4ec136a..bbf16219 100644 --- a/moq-clock/src/clock.rs +++ b/moq-clock/src/clock.rs @@ -53,19 +53,11 @@ impl Publisher { // Everything but the second. let base = now.format("%Y-%m-%d %H:%M:").to_string(); - segment - .fragment(VarInt::ZERO, base.len())? - .chunk(base.clone().into()) - .context("failed to write base")?; + segment.write(base.clone().into()).context("failed to write base")?; loop { let delta = now.format("%S").to_string(); - let sequence = VarInt::from_u32(now.second() + 1); - - segment - .fragment(sequence, delta.len())? - .chunk(delta.clone().into()) - .context("failed to write delta")?; + segment.write(delta.clone().into()).context("failed to write delta")?; println!("{}{}", base, delta); @@ -116,7 +108,7 @@ impl Subscriber { log::debug!("got first: {:?}", first); - if first.sequence.into_inner() != 0 { + if first.sequence != VarInt::ZERO { anyhow::bail!("first object must be zero; I'm not going to implement a reassembly buffer"); } diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index 77278c05..749f298f 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -1,6 +1,6 @@ use crate::cli::Config; use anyhow::{self, Context}; -use moq_transport::cache::{broadcast, fragment, segment, track}; +use moq_transport::cache::{broadcast, segment, track}; use moq_transport::VarInt; use mp4::{self, ReadBox}; use serde_json::json; @@ -47,9 +47,8 @@ impl Media { priority: 0, })?; - // Create a single fragment, optionally setting the size - let mut init_fragment = init_segment.fragment(VarInt::ZERO, init.len())?; - init_fragment.chunk(init.into())?; + // Write the init segment to the track. + init_segment.write(init.into())?; let mut tracks = HashMap::new(); @@ -212,11 +211,7 @@ impl Media { log::info!("catalog: {}", catalog_str); // Create a single fragment for the segment. - let data: bytes::Bytes = catalog_str.into(); - let mut fragment = segment.fragment(VarInt::ZERO, data.len())?; - - // Add the segment and add the fragment. - fragment.chunk(data)?; + segment.write(catalog_str.into())?; Ok(()) } @@ -264,7 +259,7 @@ struct Track { track: track::Publisher, // The current segment - current: Option, + current: Option, // The number of units per second. timescale: u64, @@ -287,7 +282,7 @@ impl Track { if let Some(current) = self.current.as_mut() { if !fragment.keyframe { // Use the existing segment - current.chunk(raw.into())?; + current.write(raw.into())?; return Ok(()); } } @@ -310,23 +305,20 @@ impl Track { priority: u32::MAX.checked_sub(timestamp).context("priority too large")?, })?; - // Create a single fragment for the segment that we will keep appending. - let mut fragment = segment.fragment(VarInt::ZERO, raw.len())?; - self.sequence += 1; - // Insert the raw atom into the segment. - fragment.chunk(raw.into())?; + // Create a single fragment for the segment that we will keep appending. + segment.write(raw.into())?; // Save for the next iteration - self.current = Some(fragment); + self.current = Some(segment); Ok(()) } pub fn data(&mut self, raw: Vec) -> anyhow::Result<()> { - let fragment = self.current.as_mut().context("missing current fragment")?; - fragment.chunk(raw.into())?; + let segment = self.current.as_mut().context("missing current fragment")?; + segment.write(raw.into())?; Ok(()) } diff --git a/moq-relay/src/error.rs b/moq-relay/src/error.rs index b943a938..ab64aac2 100644 --- a/moq-relay/src/error.rs +++ b/moq-relay/src/error.rs @@ -36,16 +36,4 @@ impl moq_transport::MoqError for RelayError { Self::WebTransportServer(_) => 500, } } - - fn reason(&self) -> String { - match self { - Self::Transport(err) => format!("transport error: {}", err.reason()), - Self::Cache(err) => format!("cache error: {}", err.reason()), - Self::MoqApi(err) => format!("api error: {}", err), - Self::Url(err) => format!("url error: {}", err), - Self::MissingNode => "missing node".to_owned(), - Self::WebTransportServer(err) => format!("upstream server error: {}", err), - Self::WebTransportClient(err) => format!("upstream client error: {}", err), - } - } } diff --git a/moq-relay/src/session.rs b/moq-relay/src/session.rs index a7570f7d..35234516 100644 --- a/moq-relay/src/session.rs +++ b/moq-relay/src/session.rs @@ -55,12 +55,12 @@ impl Session { match role { Role::Publisher => { if let Err(err) = self.serve_publisher(id, request, &path).await { - log::warn!("error serving publisher: id={} path={} err={:#?}", id, path, err); + log::warn!("error serving publisher: id={} path={} err={}", id, path, err); } } Role::Subscriber => { if let Err(err) = self.serve_subscriber(id, request, &path).await { - log::warn!("error serving subscriber: id={} path={} err={:#?}", id, path, err); + log::warn!("error serving subscriber: id={} path={} err={}", id, path, err); } } Role::Both => { diff --git a/moq-transport/src/cache/error.rs b/moq-transport/src/cache/error.rs index d3f907b6..7cb4c38b 100644 --- a/moq-transport/src/cache/error.rs +++ b/moq-transport/src/cache/error.rs @@ -24,6 +24,10 @@ pub enum CacheError { /// A resource already exists with that ID. #[error("duplicate")] Duplicate, + + /// We reported the wrong size for a fragment. + #[error("wrong size")] + WrongSize, } impl MoqError for CacheError { @@ -35,17 +39,7 @@ impl MoqError for CacheError { Self::Stop => 206, Self::NotFound => 404, Self::Duplicate => 409, - } - } - - /// A reason that is sent over the wire. - fn reason(&self) -> String { - match self { - Self::Closed => "closed".to_owned(), - Self::Reset(code) => format!("reset code: {}", code), - Self::Stop => "stop".to_owned(), - Self::NotFound => "not found".to_owned(), - Self::Duplicate => "duplicate".to_owned(), + Self::WrongSize => 500, } } } diff --git a/moq-transport/src/cache/fragment.rs b/moq-transport/src/cache/fragment.rs index 9fbcbae7..ccdc9049 100644 --- a/moq-transport/src/cache/fragment.rs +++ b/moq-transport/src/cache/fragment.rs @@ -78,6 +78,9 @@ pub struct Publisher { // Immutable segment state. info: Arc, + // The amount of promised data that has yet to be written. + remain: usize, + // Closes the segment when all Publishers are dropped. _dropped: Arc, } @@ -85,20 +88,38 @@ pub struct Publisher { impl Publisher { fn new(state: Watch, info: Arc) -> Self { let _dropped = Arc::new(Dropped::new(state.clone())); - Self { state, info, _dropped } + let remain = info.size; + + Self { + state, + info, + remain, + _dropped, + } } /// Write a new chunk of bytes. pub fn chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> { + if chunk.len() > self.remain { + return Err(CacheError::WrongSize); + } + self.remain -= chunk.len(); + let mut state = self.state.lock_mut(); state.closed.clone()?; state.chunks.push(chunk); + Ok(()) } /// Close the segment with an error. pub fn close(self, err: CacheError) -> Result<(), CacheError> { - self.state.lock_mut().close(err) + self.state.lock_mut().close(err)?; + if self.remain != 0 { + Err(CacheError::WrongSize) + } else { + Ok(()) + } } } diff --git a/moq-transport/src/cache/segment.rs b/moq-transport/src/cache/segment.rs index e08b1d03..cb396dbb 100644 --- a/moq-transport/src/cache/segment.rs +++ b/moq-transport/src/cache/segment.rs @@ -78,6 +78,9 @@ pub struct Publisher { // Immutable segment state. info: Arc, + // The next fragment sequence number to use. + next: u64, + // Closes the segment when all Publishers are dropped. _dropped: Arc, } @@ -85,12 +88,28 @@ pub struct Publisher { impl Publisher { fn new(state: Watch, info: Arc) -> Self { let _dropped = Arc::new(Dropped::new(state.clone())); - Self { state, info, _dropped } + Self { + state, + info, + next: 0, + _dropped, + } + } + + /// Write an object with the given payload. + pub fn write(&mut self, data: bytes::Bytes) -> Result<(), CacheError> { + self.fragment(data.len())?.chunk(data) } - // Not public because it's a footgun. - pub(crate) fn push_fragment(&mut self, sequence: VarInt, size: usize) -> Result { - let (publisher, subscriber) = fragment::new(fragment::Info { sequence, size }); + /// Write an object over multiple fragments. + /// + /// BAD STUFF will happen if the size is wrong. + pub fn fragment(&mut self, size: usize) -> Result { + let (publisher, subscriber) = fragment::new(fragment::Info { + sequence: self.next.try_into().unwrap(), + size, + }); + self.next += 1; let mut state = self.state.lock_mut(); state.closed.clone()?; @@ -98,11 +117,6 @@ impl Publisher { Ok(publisher) } - /// Write a fragment - pub fn fragment(&mut self, sequence: VarInt, size: usize) -> Result { - self.push_fragment(sequence, size) - } - /// Close the segment with an error. pub fn close(self, err: CacheError) -> Result<(), CacheError> { self.state.lock_mut().close(err) diff --git a/moq-transport/src/coding/params.rs b/moq-transport/src/coding/params.rs index 9cfd6f38..1cc85abd 100644 --- a/moq-transport/src/coding/params.rs +++ b/moq-transport/src/coding/params.rs @@ -18,7 +18,7 @@ impl Decode for Params { async fn decode(mut r: &mut R) -> Result { let mut params = HashMap::new(); - // I hate this shit so much; let me encode my role and get on with my life. + // I hate this encoding so much; let me encode my role and get on with my life. let count = VarInt::decode(r).await?; for _ in 0..count.into_inner() { let kind = VarInt::decode(r).await?; diff --git a/moq-transport/src/coding/string.rs b/moq-transport/src/coding/string.rs index 2cdff4a9..1f660d3f 100644 --- a/moq-transport/src/coding/string.rs +++ b/moq-transport/src/coding/string.rs @@ -21,9 +21,9 @@ impl Encode for String { impl Decode for String { /// Decode a string with a varint length prefix. async fn decode(r: &mut R) -> Result { - let size = VarInt::decode(r).await?.into_inner(); - let mut str = String::with_capacity(min(1024, size) as usize); - r.take(size).read_to_string(&mut str).await?; + let size = VarInt::decode(r).await?.into(); + let mut str = String::with_capacity(min(1024, size)); + r.take(size as u64).read_to_string(&mut str).await?; Ok(str) } } diff --git a/moq-transport/src/coding/varint.rs b/moq-transport/src/coding/varint.rs index 8557de8a..95bc8074 100644 --- a/moq-transport/src/coding/varint.rs +++ b/moq-transport/src/coding/varint.rs @@ -18,8 +18,8 @@ pub struct BoundsExceeded; /// An integer less than 2^62 /// /// Values of this type are suitable for encoding as QUIC variable-length integer. -// It would be neat if we could express to Rust that the top two bits are available for use as enum -// discriminants +/// It would be neat if we could express to Rust that the top two bits are available for use as enum +/// discriminants #[derive(Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct VarInt(u64); @@ -83,8 +83,9 @@ impl TryFrom for VarInt { /// Succeeds iff `x` < 2^62 fn try_from(x: u64) -> Result { - if x <= Self::MAX.into_inner() { - Ok(Self(x)) + let x = Self(x); + if x <= Self::MAX { + Ok(x) } else { Err(BoundsExceeded) } @@ -217,7 +218,7 @@ impl Encode for VarInt { } else if x < 2u64.pow(62) { w.write_u64(0b11 << 62 | x).await?; } else { - unreachable!("malformed VarInt"); + return Err(BoundsExceeded.into()); } Ok(()) diff --git a/moq-transport/src/error.rs b/moq-transport/src/error.rs index d070251a..66579eed 100644 --- a/moq-transport/src/error.rs +++ b/moq-transport/src/error.rs @@ -1,7 +1,4 @@ -pub trait MoqError { +pub trait MoqError: std::error::Error { /// An integer code that is sent over the wire. fn code(&self) -> u32; - - /// An optional reason sometimes sent over the wire. - fn reason(&self) -> String; } diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index 05385d8b..0a137312 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -6,13 +6,13 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, VarInt}; /// Objects will use the provided ID instead of the full track name, to save bytes. #[derive(Clone, Debug)] pub struct Subscribe { + /// The subscription ID pub id: VarInt, - /// The track namespace. - pub namespace: String, - - /// The track name. - pub name: String, + /// Track properties + pub track_alias: VarInt, // This alias is useless but part of the spec + pub track_namespace: String, + pub track_name: String, /// The start/end group/object. pub start_group: SubscribeLocation, @@ -27,8 +27,9 @@ pub struct Subscribe { impl Subscribe { pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - let namespace = String::decode(r).await?; - let name = String::decode(r).await?; + let track_alias = VarInt::decode(r).await?; + let track_namespace = String::decode(r).await?; + let track_name = String::decode(r).await?; let start_group = SubscribeLocation::decode(r).await?; let start_object = SubscribeLocation::decode(r).await?; @@ -51,8 +52,9 @@ impl Subscribe { Ok(Self { id, - namespace, - name, + track_alias, + track_namespace, + track_name, start_group, start_object, end_group, @@ -63,8 +65,9 @@ impl Subscribe { pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - self.namespace.encode(w).await?; - self.name.encode(w).await?; + self.track_alias.encode(w).await?; + self.track_namespace.encode(w).await?; + self.track_name.encode(w).await?; self.start_group.encode(w).await?; self.start_object.encode(w).await?; diff --git a/moq-transport/src/message/subscribe_fin.rs b/moq-transport/src/message/subscribe_fin.rs index eb02f7d1..38a4efda 100644 --- a/moq-transport/src/message/subscribe_fin.rs +++ b/moq-transport/src/message/subscribe_fin.rs @@ -8,23 +8,27 @@ pub struct SubscribeFin { pub id: VarInt, /// The final group/object sent on this subscription. - pub group: VarInt, - pub object: VarInt, + pub final_group: VarInt, + pub final_object: VarInt, } impl SubscribeFin { pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - let group = VarInt::decode(r).await?; - let object = VarInt::decode(r).await?; + let final_group = VarInt::decode(r).await?; + let final_object = VarInt::decode(r).await?; - Ok(Self { id, group, object }) + Ok(Self { + id, + final_group, + final_object, + }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - self.group.encode(w).await?; - self.object.encode(w).await?; + self.final_group.encode(w).await?; + self.final_object.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/message/subscribe_ok.rs index ea9efe76..aac07a70 100644 --- a/moq-transport/src/message/subscribe_ok.rs +++ b/moq-transport/src/message/subscribe_ok.rs @@ -5,17 +5,20 @@ use crate::coding::{AsyncRead, AsyncWrite}; /// Sent by the publisher to accept a Subscribe. #[derive(Clone, Debug)] pub struct SubscribeOk { - /// The ID for this track. + /// The ID for this subscription. pub id: VarInt, /// The subscription will expire in this many milliseconds. - pub expires: VarInt, + pub expires: Option, } impl SubscribeOk { pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - let expires = VarInt::decode(r).await?; + let expires = match VarInt::decode(r).await? { + VarInt::ZERO => None, + expires => Some(expires), + }; Ok(Self { id, expires }) } } @@ -23,7 +26,7 @@ impl SubscribeOk { impl SubscribeOk { pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - self.expires.encode(w).await?; + self.expires.unwrap_or(VarInt::ZERO).encode(w).await?; Ok(()) } } diff --git a/moq-transport/src/object/group.rs b/moq-transport/src/object/group.rs index 5b7bd43a..b2f370cf 100644 --- a/moq-transport/src/object/group.rs +++ b/moq-transport/src/object/group.rs @@ -2,10 +2,10 @@ use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeEr #[derive(Clone, Debug)] pub struct GroupHeader { - // The subscribe ID for this track. + // The subscribe ID. pub subscribe: VarInt, - // Identifies the name of the track + // The track alias. pub track: VarInt, // The group sequence number @@ -40,6 +40,7 @@ impl GroupHeader { } } +#[derive(Clone, Debug)] pub struct GroupChunk { pub object: VarInt, pub size: VarInt, diff --git a/moq-transport/src/object/stream.rs b/moq-transport/src/object/stream.rs index d1bee578..6d7a7652 100644 --- a/moq-transport/src/object/stream.rs +++ b/moq-transport/src/object/stream.rs @@ -4,10 +4,10 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; /// Sent by the publisher as the header of each data stream. #[derive(Clone, Debug)] pub struct Stream { - // The subscribe ID for this track. + // The subscribe ID. pub subscribe: VarInt, - // Identifies the name of the track + // The track alias. pub track: VarInt, // The sequence number within the track. diff --git a/moq-transport/src/object/track.rs b/moq-transport/src/object/track.rs index 66874249..02fd6275 100644 --- a/moq-transport/src/object/track.rs +++ b/moq-transport/src/object/track.rs @@ -3,10 +3,10 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; #[derive(Clone, Debug)] pub struct TrackHeader { - // The subscribe ID for this track. + // The subscribe ID. pub subscribe: VarInt, - // Identifies the name of the track + // The track ID. pub track: VarInt, // The priority, where **smaller** values are sent first. @@ -35,6 +35,7 @@ impl TrackHeader { } } +#[derive(Clone, Debug)] pub struct TrackChunk { pub group: VarInt, pub object: VarInt, diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index 228a4c87..3c49f76d 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -52,6 +52,10 @@ pub enum SessionError { #[error("varint bounds exceeded")] BoundsExceeded(#[from] coding::BoundsExceeded), + /// Sequence numbers were received out of order + #[error("sequence numbers out of order: expected={0} actual={1}")] + OutOfOrder(VarInt, VarInt), + /// An unclassified error because I'm lazy. TODO classify these errors #[error("unknown error: {0}")] Unknown(String), @@ -76,32 +80,7 @@ impl MoqError for SessionError { Self::InvalidSize(_) => 400, Self::RequiredExtension(_) => 426, Self::BoundsExceeded(_) => 500, - } - } - - /// A reason that is sent over the wire. - fn reason(&self) -> String { - match self { - Self::Cache(err) => err.reason(), - Self::RoleViolation(kind) => format!("role violation for message type {:?}", kind), - Self::RoleIncompatible(client, server) => { - format!( - "role incompatible: client wanted {:?} but server wanted {:?}", - client, server - ) - } - Self::Read(err) => format!("read error: {}", err), - Self::Write(err) => format!("write error: {}", err), - Self::Session(err) => format!("session error: {}", err), - Self::Unknown(err) => format!("unknown error: {}", err), - Self::Version(client, server) => format!("unsupported versions: client={:?} server={:?}", client, server), - Self::Encode(err) => format!("encode error: {}", err), - Self::Decode(err) => format!("decode error: {}", err), - Self::StreamMapping => "streaming mapping conflict".to_owned(), - Self::InvalidPriority(priority) => format!("invalid priority: {}", priority), - Self::InvalidSize(size) => format!("invalid size: {}", size), - Self::RequiredExtension(id) => format!("required extension was missing: {:?}", id), - Self::BoundsExceeded(_) => "varint bounds exceeded".to_string(), + Self::OutOfOrder(_, _) => 400, } } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 09bce98a..e857b45a 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -73,7 +73,7 @@ impl Publisher { }, // No more broadcasts are available. err = self.source.closed() => { - self.webtransport.close(err.code(), err.reason().as_bytes()); + self.webtransport.close(err.code(), err.to_string().as_bytes()); return Ok(()); }, } @@ -116,7 +116,7 @@ impl Publisher { self.control .send(message::SubscribeOk { id: msg.id, - expires: VarInt::ZERO, + expires: None, }) .await } @@ -125,7 +125,7 @@ impl Publisher { let msg = message::SubscribeReset { id, code: err.code(), - reason: err.reason(), + reason: err.to_string(), // TODO properly populate these // But first: https://github.com/moq-wg/moq-transport/issues/313 @@ -137,7 +137,7 @@ impl Publisher { } fn start_subscribe(&mut self, msg: message::Subscribe) -> Result { - let mut track = self.source.get_track(&msg.name)?; + let mut track = self.source.get_track(&msg.track_name)?; // TODO only clone the fields we need let mut this = self.clone(); @@ -147,7 +147,7 @@ impl Publisher { let res = this.run_subscribe(msg.id, &mut track).await; if let Err(err) = &res { - log::warn!("failed to serve track: name={} err={:#?}", track.name, err); + log::warn!("failed to serve track: name={} err={}", track.name, err); } // Make sure we send a reset at the end. @@ -179,24 +179,24 @@ impl Publisher { } async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> { - log::trace!("serving group: {:?}", segment); - - let mut stream = self.webtransport.open_uni().await?; - - // Convert the u32 to a i32, since the Quinn set_priority is signed. - let priority = (segment.priority as i64 - i32::MAX as i64) as i32; - stream.set_priority(priority).ok(); - - let object = object::GroupHeader { + let header: object::Object = object::GroupHeader { subscribe: id, track: id, // Properties of the segment group: segment.sequence, priority: segment.priority, - }; + } + .into(); + + log::trace!("sending stream: {:?}", header); + let mut stream = self.webtransport.open_uni().await?; + + // Convert the u32 to a i32, since the Quinn set_priority is signed. + let priority = (segment.priority as i64 - i32::MAX as i64) as i32; + stream.set_priority(priority).ok(); - object + header .encode(&mut stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; @@ -207,14 +207,16 @@ impl Publisher { size: VarInt::try_from(fragment.size)?, }; + log::trace!("sending chunk: {:?}", object); + object .encode(&mut stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; - while let Some(chunk) = fragment.chunk().await? { - //log::trace!("writing chunk: {:?}", chunk); - stream.write_all(&chunk).await?; + while let Some(data) = fragment.chunk().await? { + stream.write_all(&data).await?; + log::trace!("wrote data: len={}", data.len()); } } diff --git a/moq-transport/src/session/server.rs b/moq-transport/src/session/server.rs index 08b030ef..3908f34a 100644 --- a/moq-transport/src/session/server.rs +++ b/moq-transport/src/session/server.rs @@ -70,7 +70,7 @@ impl Request { fn setup(&mut self, role: setup::Role) -> Result { let server = setup::Server { role, - version: setup::Version::DRAFT_01, + version: setup::Version::DRAFT_02, params: Default::default(), }; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 21afe772..64922d01 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -99,7 +99,7 @@ impl Subscriber { tokio::spawn(async move { if let Err(err) = this.run_stream(stream).await { - log::warn!("failed to receive stream: err={:#?}", err); + log::warn!("failed to receive stream: err={}", err); } }); } @@ -111,6 +111,8 @@ impl Subscriber { .await .map_err(|e| SessionError::Unknown(e.to_string()))?; + log::trace!("receiving stream: {:?}", header); + match header { Object::TrackHeader(header) => self.run_track(header, stream).await, Object::GroupHeader(header) => self.run_group(header, stream).await, @@ -130,6 +132,8 @@ impl Subscriber { Err(err) => return Err(err.into()), }; + log::trace!("receiving chunk: {:?}", chunk); + // TODO error if we get a duplicate group let mut segment = { let mut subscribes = self.subscribes.lock().unwrap(); @@ -144,15 +148,18 @@ impl Subscriber { let mut remain = chunk.size.into(); // Create a new obvject. - let mut fragment = segment.push_fragment(chunk.object, remain)?; + let mut fragment = segment.fragment(remain)?; while remain > 0 { let data = stream .read_chunk(remain, true) .await? - .ok_or(DecodeError::UnexpectedEnd)?; - remain -= data.bytes.len(); - fragment.chunk(data.bytes)?; + .ok_or(DecodeError::UnexpectedEnd)? + .bytes; + + log::trace!("read data: len={}", data.len()); + remain -= data.len(); + fragment.chunk(data)?; } } @@ -170,6 +177,10 @@ impl Subscriber { })? }; + // Sanity check to make sure we receive in order + // The draft shouldn't even include sequence numbers but whatever + let mut expected = 0; + loop { let chunk = match object::GroupChunk::decode(&mut stream).await { Ok(chunk) => chunk, @@ -181,10 +192,18 @@ impl Subscriber { Err(err) => return Err(err.into()), }; + log::trace!("receiving chunk: {:?}", chunk); + + if chunk.object.into_inner() != expected { + return Err(SessionError::OutOfOrder(expected.try_into()?, chunk.object)); + } + + expected = chunk.object.into_inner(); + let mut remain = chunk.size.into(); // Create a new obvject. - let mut fragment = segment.push_fragment(chunk.object, remain)?; + let mut fragment = segment.fragment(remain)?; while remain > 0 { let data = stream @@ -207,15 +226,17 @@ impl Subscriber { loop { // NOTE: This returns Closed when the source is closed. let track = self.source.next_track().await?; - let name = track.name.clone(); + let track_name = track.name.clone(); let id = VarInt::from_u32(self.next.fetch_add(1, atomic::Ordering::SeqCst)); self.subscribes.lock().unwrap().insert(id, track); let msg = message::Subscribe { id, - namespace: "".to_string(), - name, + + track_alias: id, // This alias is useless but part of the spec. + track_namespace: "".to_string(), + track_name, // TODO correctly support these start_group: message::SubscribeLocation::Latest(VarInt::ZERO), From 0d70e663d4cc1d98a5e3fd4fb422f68c011d01b4 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 8 Mar 2024 13:23:13 -0800 Subject: [PATCH 3/6] Rename object -> data --- moq-transport/src/coding/decode.rs | 4 --- moq-transport/src/{object => data}/group.rs | 0 moq-transport/src/{object => data}/mod.rs | 0 moq-transport/src/{object => data}/stream.rs | 0 moq-transport/src/{object => data}/track.rs | 0 moq-transport/src/lib.rs | 2 +- moq-transport/src/session/publisher.rs | 8 +++--- moq-transport/src/session/subscriber.rs | 28 ++++++++++---------- 8 files changed, 19 insertions(+), 23 deletions(-) rename moq-transport/src/{object => data}/group.rs (100%) rename moq-transport/src/{object => data}/mod.rs (100%) rename moq-transport/src/{object => data}/stream.rs (100%) rename moq-transport/src/{object => data}/track.rs (100%) diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index a6fe94e4..fe92d0d9 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -48,8 +48,4 @@ pub enum DecodeError { #[error("io error: {0}")] IoError(#[from] std::io::Error), - - // Used to signal that the stream has ended. - #[error("no more messages")] - Final, } diff --git a/moq-transport/src/object/group.rs b/moq-transport/src/data/group.rs similarity index 100% rename from moq-transport/src/object/group.rs rename to moq-transport/src/data/group.rs diff --git a/moq-transport/src/object/mod.rs b/moq-transport/src/data/mod.rs similarity index 100% rename from moq-transport/src/object/mod.rs rename to moq-transport/src/data/mod.rs diff --git a/moq-transport/src/object/stream.rs b/moq-transport/src/data/stream.rs similarity index 100% rename from moq-transport/src/object/stream.rs rename to moq-transport/src/data/stream.rs diff --git a/moq-transport/src/object/track.rs b/moq-transport/src/data/track.rs similarity index 100% rename from moq-transport/src/object/track.rs rename to moq-transport/src/data/track.rs diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 0ff004ee..734202bd 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -10,8 +10,8 @@ mod coding; mod error; pub mod cache; +pub mod data; pub mod message; -pub mod object; pub mod session; pub mod setup; diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index e857b45a..2e05ddc0 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -8,9 +8,9 @@ use webtransport_quinn::Session; use crate::{ cache::{broadcast, segment, track, CacheError}, - message, + data, message, message::Message, - object, MoqError, VarInt, + MoqError, VarInt, }; use super::{Control, SessionError}; @@ -179,7 +179,7 @@ impl Publisher { } async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> { - let header: object::Object = object::GroupHeader { + let header: data::Object = data::GroupHeader { subscribe: id, track: id, @@ -202,7 +202,7 @@ impl Publisher { .map_err(|e| SessionError::Unknown(e.to_string()))?; while let Some(mut fragment) = segment.fragment().await? { - let object = object::GroupChunk { + let object = data::GroupChunk { object: fragment.sequence, size: VarInt::try_from(fragment.size)?, }; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 64922d01..8adc5b39 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -8,9 +8,9 @@ use std::{ use crate::{ cache::{broadcast, segment, track, CacheError}, coding::DecodeError, + data::{self, Object}, message, message::Message, - object::{self, Object}, session::{Control, SessionError}, VarInt, }; @@ -120,13 +120,13 @@ impl Subscriber { } } - async fn run_track(self, header: object::TrackHeader, mut stream: RecvStream) -> Result<(), SessionError> { + async fn run_track(self, header: data::TrackHeader, mut stream: RecvStream) -> Result<(), SessionError> { loop { - let chunk = match object::TrackChunk::decode(&mut stream).await { + let chunk = match data::TrackChunk::decode(&mut stream).await { Ok(next) => next, - // No more objects - Err(DecodeError::Final) => break, + // TODO Figure out a way to check for stream FIN instead + Err(DecodeError::UnexpectedEnd) => break, // Unknown error Err(err) => return Err(err.into()), @@ -166,7 +166,7 @@ impl Subscriber { Ok(()) } - async fn run_group(self, header: object::GroupHeader, mut stream: RecvStream) -> Result<(), SessionError> { + async fn run_group(self, header: data::GroupHeader, mut stream: RecvStream) -> Result<(), SessionError> { let mut segment = { let mut subscribes = self.subscribes.lock().unwrap(); let track = subscribes.get_mut(&header.subscribe).ok_or(CacheError::NotFound)?; @@ -177,16 +177,15 @@ impl Subscriber { })? }; - // Sanity check to make sure we receive in order - // The draft shouldn't even include sequence numbers but whatever + // Sanity check to make sure we receive the group in order let mut expected = 0; loop { - let chunk = match object::GroupChunk::decode(&mut stream).await { + let chunk = match data::GroupChunk::decode(&mut stream).await { Ok(chunk) => chunk, - // No more objects - Err(DecodeError::Final) => break, + // TODO Figure out a way to check for stream FIN instead + Err(DecodeError::UnexpectedEnd) => break, // Unknown error Err(err) => return Err(err.into()), @@ -194,11 +193,12 @@ impl Subscriber { log::trace!("receiving chunk: {:?}", chunk); - if chunk.object.into_inner() != expected { + // NOTE: We allow gaps, but not going backwards. + if chunk.object.into_inner() < expected { return Err(SessionError::OutOfOrder(expected.try_into()?, chunk.object)); } - expected = chunk.object.into_inner(); + expected = chunk.object.into_inner() + 1; let mut remain = chunk.size.into(); @@ -218,7 +218,7 @@ impl Subscriber { Ok(()) } - async fn run_object(self, _header: object::Stream, _stream: RecvStream) -> Result<(), SessionError> { + async fn run_object(self, _header: data::Stream, _stream: RecvStream) -> Result<(), SessionError> { unimplemented!("TODO"); } From 6e438b1e9c10fc174eafe2e6ec321acfb1541b88 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 8 Mar 2024 13:33:04 -0800 Subject: [PATCH 4/6] Rename object->data --- moq-transport/src/data/group.rs | 4 +- moq-transport/src/data/header.rs | 95 +++++++++++++++++ moq-transport/src/data/mod.rs | 100 +----------------- .../src/data/{stream.rs => object.rs} | 5 +- moq-transport/src/data/track.rs | 4 +- moq-transport/src/session/publisher.rs | 7 +- moq-transport/src/session/subscriber.rs | 17 ++- 7 files changed, 116 insertions(+), 116 deletions(-) create mode 100644 moq-transport/src/data/header.rs rename moq-transport/src/data/{stream.rs => object.rs} (92%) diff --git a/moq-transport/src/data/group.rs b/moq-transport/src/data/group.rs index b2f370cf..5a7ceacd 100644 --- a/moq-transport/src/data/group.rs +++ b/moq-transport/src/data/group.rs @@ -1,7 +1,7 @@ use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; #[derive(Clone, Debug)] -pub struct GroupHeader { +pub struct Group { // The subscribe ID. pub subscribe: VarInt, @@ -15,7 +15,7 @@ pub struct GroupHeader { pub priority: u32, } -impl GroupHeader { +impl Group { pub async fn decode(r: &mut R) -> Result { let subscribe = VarInt::decode(r).await?; let track = VarInt::decode(r).await?; diff --git a/moq-transport/src/data/header.rs b/moq-transport/src/data/header.rs new file mode 100644 index 00000000..aedfc91d --- /dev/null +++ b/moq-transport/src/data/header.rs @@ -0,0 +1,95 @@ +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; +use std::fmt; + +use super::{Group, Object, Track}; + +// Use a macro to generate the message types rather than copy-paste. +// This implements a decode/encode method that uses the specified type. +macro_rules! header_types { + {$($name:ident = $val:expr,)*} => { + /// All supported message types. + #[derive(Clone)] + pub enum Header { + $($name($name)),* + } + + impl Header { + pub async fn decode(r: &mut R) -> Result { + let t = VarInt::decode(r).await?; + + match t.into_inner() { + $($val => { + let msg = $name::decode(r).await?; + Ok(Self::$name(msg)) + })* + _ => Err(DecodeError::InvalidMessage(t)), + } + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + $(Self::$name(ref m) => { + VarInt::from_u32($val).encode(w).await?; + m.encode(w).await + },)* + } + } + + pub fn id(&self) -> VarInt { + match self { + $(Self::$name(_) => { + VarInt::from_u32($val) + },)* + } + } + + pub fn name(&self) -> &'static str { + match self { + $(Self::$name(_) => { + stringify!($name) + },)* + } + } + + pub fn subscribe(&self) -> VarInt { + match self { + $(Self::$name(o) => o.subscribe,)* + } + } + + pub fn track(&self) -> VarInt { + match self { + $(Self::$name(o) => o.track,)* + } + } + + pub fn priority(&self) -> u32 { + match self { + $(Self::$name(o) => o.priority,)* + } + } + } + + $(impl From<$name> for Header { + fn from(m: $name) -> Self { + Self::$name(m) + } + })* + + impl fmt::Debug for Header { + // Delegate to the message formatter + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + $(Self::$name(ref m) => m.fmt(f),)* + } + } + } + } +} + +// Each object type is prefixed with the given VarInt type. +header_types! { + Object = 0x0, + Group = 0x50, + Track = 0x51, +} diff --git a/moq-transport/src/data/mod.rs b/moq-transport/src/data/mod.rs index 36e5202d..b820bdcf 100644 --- a/moq-transport/src/data/mod.rs +++ b/moq-transport/src/data/mod.rs @@ -1,101 +1,9 @@ mod group; -mod stream; +mod header; +mod object; mod track; pub use group::*; -pub use stream::*; +pub use header::*; +pub use object::*; pub use track::*; - -use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; -use std::fmt; - -// Use a macro to generate the message types rather than copy-paste. -// This implements a decode/encode method that uses the specified type. -macro_rules! object_types { - {$($name:ident = $val:expr,)*} => { - /// All supported message types. - #[derive(Clone)] - pub enum Object { - $($name($name)),* - } - - impl Object { - pub async fn decode(r: &mut R) -> Result { - let t = VarInt::decode(r).await?; - - match t.into_inner() { - $($val => { - let msg = $name::decode(r).await?; - Ok(Self::$name(msg)) - })* - _ => Err(DecodeError::InvalidMessage(t)), - } - } - - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - match self { - $(Self::$name(ref m) => { - VarInt::from_u32($val).encode(w).await?; - m.encode(w).await - },)* - } - } - - pub fn id(&self) -> VarInt { - match self { - $(Self::$name(_) => { - VarInt::from_u32($val) - },)* - } - } - - pub fn name(&self) -> &'static str { - match self { - $(Self::$name(_) => { - stringify!($name) - },)* - } - } - - pub fn subscribe(&self) -> VarInt { - match self { - $(Self::$name(o) => o.subscribe,)* - } - } - - pub fn track(&self) -> VarInt { - match self { - $(Self::$name(o) => o.track,)* - } - } - - pub fn priority(&self) -> u32 { - match self { - $(Self::$name(o) => o.priority,)* - } - } - } - - $(impl From<$name> for Object { - fn from(m: $name) -> Self { - Object::$name(m) - } - })* - - impl fmt::Debug for Object { - // Delegate to the message formatter - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - $(Self::$name(ref m) => m.fmt(f),)* - } - } - } - } -} - -// Each object type is prefixed with the given VarInt type. -object_types! { - Stream = 0x0, - GroupHeader = 0x50, - TrackHeader = 0x51, -} diff --git a/moq-transport/src/data/stream.rs b/moq-transport/src/data/object.rs similarity index 92% rename from moq-transport/src/data/stream.rs rename to moq-transport/src/data/object.rs index 6d7a7652..f4196a34 100644 --- a/moq-transport/src/data/stream.rs +++ b/moq-transport/src/data/object.rs @@ -1,9 +1,8 @@ use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -/// Sent by the publisher as the header of each data stream. #[derive(Clone, Debug)] -pub struct Stream { +pub struct Object { // The subscribe ID. pub subscribe: VarInt, @@ -20,7 +19,7 @@ pub struct Stream { pub priority: u32, } -impl Stream { +impl Object { pub async fn decode(r: &mut R) -> Result { let subscribe = VarInt::decode(r).await?; let track = VarInt::decode(r).await?; diff --git a/moq-transport/src/data/track.rs b/moq-transport/src/data/track.rs index 02fd6275..0547a833 100644 --- a/moq-transport/src/data/track.rs +++ b/moq-transport/src/data/track.rs @@ -2,7 +2,7 @@ use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; #[derive(Clone, Debug)] -pub struct TrackHeader { +pub struct Track { // The subscribe ID. pub subscribe: VarInt, @@ -13,7 +13,7 @@ pub struct TrackHeader { pub priority: u32, } -impl TrackHeader { +impl Track { pub async fn decode(r: &mut R) -> Result { let subscribe = VarInt::decode(r).await?; let track = VarInt::decode(r).await?; diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 2e05ddc0..fe836117 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -179,15 +179,14 @@ impl Publisher { } async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> { - let header: data::Object = data::GroupHeader { + let header = data::Group { subscribe: id, track: id, // Properties of the segment group: segment.sequence, priority: segment.priority, - } - .into(); + }; log::trace!("sending stream: {:?}", header); let mut stream = self.webtransport.open_uni().await?; @@ -196,7 +195,7 @@ impl Publisher { let priority = (segment.priority as i64 - i32::MAX as i64) as i32; stream.set_priority(priority).ok(); - header + Into::::into(header) .encode(&mut stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 8adc5b39..ac1c7b91 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -8,8 +8,7 @@ use std::{ use crate::{ cache::{broadcast, segment, track, CacheError}, coding::DecodeError, - data::{self, Object}, - message, + data, message, message::Message, session::{Control, SessionError}, VarInt, @@ -107,20 +106,20 @@ impl Subscriber { async fn run_stream(self, mut stream: RecvStream) -> Result<(), SessionError> { // Decode the object on the data stream. - let header = Object::decode(&mut stream) + let header = data::Header::decode(&mut stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; log::trace!("receiving stream: {:?}", header); match header { - Object::TrackHeader(header) => self.run_track(header, stream).await, - Object::GroupHeader(header) => self.run_group(header, stream).await, - Object::Stream(header) => self.run_object(header, stream).await, + data::Header::Track(header) => self.run_track(header, stream).await, + data::Header::Group(header) => self.run_group(header, stream).await, + data::Header::Object(header) => self.run_object(header, stream).await, } } - async fn run_track(self, header: data::TrackHeader, mut stream: RecvStream) -> Result<(), SessionError> { + async fn run_track(self, header: data::Track, mut stream: RecvStream) -> Result<(), SessionError> { loop { let chunk = match data::TrackChunk::decode(&mut stream).await { Ok(next) => next, @@ -166,7 +165,7 @@ impl Subscriber { Ok(()) } - async fn run_group(self, header: data::GroupHeader, mut stream: RecvStream) -> Result<(), SessionError> { + async fn run_group(self, header: data::Group, mut stream: RecvStream) -> Result<(), SessionError> { let mut segment = { let mut subscribes = self.subscribes.lock().unwrap(); let track = subscribes.get_mut(&header.subscribe).ok_or(CacheError::NotFound)?; @@ -218,7 +217,7 @@ impl Subscriber { Ok(()) } - async fn run_object(self, _header: data::Stream, _stream: RecvStream) -> Result<(), SessionError> { + async fn run_object(self, _header: data::Object, _stream: RecvStream) -> Result<(), SessionError> { unimplemented!("TODO"); } From adecfc93e889531a8e17e8dd1ee922a2c00e0887 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 8 Mar 2024 14:25:16 -0800 Subject: [PATCH 5/6] draft-03 support? --- moq-transport/src/cache/error.rs | 2 +- moq-transport/src/coding/decode.rs | 3 + .../src/{message => control}/announce.rs | 0 .../announce_error.rs} | 0 .../src/{message => control}/announce_ok.rs | 0 .../src/{message => control}/go_away.rs | 0 .../{message/mod.rs => control/message.rs} | 64 ++----------------- moq-transport/src/control/mod.rs | 56 ++++++++++++++++ .../src/{message => control}/subscribe.rs | 0 moq-transport/src/control/subscribe_done.rs | 51 +++++++++++++++ .../{message => control}/subscribe_error.rs | 6 +- .../src/{message => control}/subscribe_ok.rs | 26 +++++++- .../src/{message => control}/unannounce.rs | 0 .../src/{message => control}/unsubscribe.rs | 0 moq-transport/src/data/datagram.rs | 41 ++++++++++++ moq-transport/src/data/group.rs | 41 ++++++------ moq-transport/src/data/header.rs | 15 +++-- moq-transport/src/data/mod.rs | 2 + moq-transport/src/data/object.rs | 38 +++++------ moq-transport/src/data/track.rs | 40 ++++++------ moq-transport/src/lib.rs | 2 +- moq-transport/src/message/subscribe_fin.rs | 35 ---------- moq-transport/src/message/subscribe_reset.rs | 48 -------------- moq-transport/src/session/control.rs | 8 +-- moq-transport/src/session/error.rs | 4 ++ moq-transport/src/session/publisher.rs | 41 ++++++------ moq-transport/src/session/subscriber.rs | 39 +++++------ moq-transport/src/setup/version.rs | 60 +---------------- 28 files changed, 303 insertions(+), 319 deletions(-) rename moq-transport/src/{message => control}/announce.rs (100%) rename moq-transport/src/{message/announce_reset.rs => control/announce_error.rs} (100%) rename moq-transport/src/{message => control}/announce_ok.rs (100%) rename moq-transport/src/{message => control}/go_away.rs (100%) rename moq-transport/src/{message/mod.rs => control/message.rs} (57%) create mode 100644 moq-transport/src/control/mod.rs rename moq-transport/src/{message => control}/subscribe.rs (100%) create mode 100644 moq-transport/src/control/subscribe_done.rs rename moq-transport/src/{message => control}/subscribe_error.rs (88%) rename moq-transport/src/{message => control}/subscribe_ok.rs (60%) rename moq-transport/src/{message => control}/unannounce.rs (100%) rename moq-transport/src/{message => control}/unsubscribe.rs (100%) create mode 100644 moq-transport/src/data/datagram.rs delete mode 100644 moq-transport/src/message/subscribe_fin.rs delete mode 100644 moq-transport/src/message/subscribe_reset.rs diff --git a/moq-transport/src/cache/error.rs b/moq-transport/src/cache/error.rs index 7cb4c38b..dfe84f6e 100644 --- a/moq-transport/src/cache/error.rs +++ b/moq-transport/src/cache/error.rs @@ -9,7 +9,7 @@ pub enum CacheError { #[error("closed")] Closed, - /// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher. + /// A SUBSCRIBE_DONE or ANNOUNCE_CANCEL was received. #[error("reset code={0:?}")] Reset(u32), diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index fe92d0d9..957c4547 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -33,6 +33,9 @@ pub enum DecodeError { #[error("invalid subscribe location")] InvalidSubscribeLocation, + #[error("invalid value")] + InvalidValue, + #[error("varint bounds exceeded")] BoundsExceeded(#[from] BoundsExceeded), diff --git a/moq-transport/src/message/announce.rs b/moq-transport/src/control/announce.rs similarity index 100% rename from moq-transport/src/message/announce.rs rename to moq-transport/src/control/announce.rs diff --git a/moq-transport/src/message/announce_reset.rs b/moq-transport/src/control/announce_error.rs similarity index 100% rename from moq-transport/src/message/announce_reset.rs rename to moq-transport/src/control/announce_error.rs diff --git a/moq-transport/src/message/announce_ok.rs b/moq-transport/src/control/announce_ok.rs similarity index 100% rename from moq-transport/src/message/announce_ok.rs rename to moq-transport/src/control/announce_ok.rs diff --git a/moq-transport/src/message/go_away.rs b/moq-transport/src/control/go_away.rs similarity index 100% rename from moq-transport/src/message/go_away.rs rename to moq-transport/src/control/go_away.rs diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/control/message.rs similarity index 57% rename from moq-transport/src/message/mod.rs rename to moq-transport/src/control/message.rs index 3865dbe6..3b3546f3 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/control/message.rs @@ -1,62 +1,11 @@ -//! Low-level message sent over the wire, as defined in the specification. -//! -//! All of these messages are sent over a bidirectional QUIC stream. -//! This introduces some head-of-line blocking but preserves ordering. -//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams. -//! -//! Messages sent by the publisher: -//! - [Announce] -//! - [Unannounce] -//! - [SubscribeOk] -//! - [SubscribeError] -//! - [SubscribeReset] -//! - [Object] -//! -//! Messages sent by the subscriber: -//! - [Subscribe] -//! - [Unsubscribe] -//! - [AnnounceOk] -//! - [AnnounceError] -//! -//! Example flow: -//! ```test -//! -> ANNOUNCE namespace="foo" -//! <- ANNOUNCE_OK namespace="foo" -//! <- SUBSCRIBE id=0 namespace="foo" name="bar" -//! -> SUBSCRIBE_OK id=0 -//! -> OBJECT id=0 sequence=69 priority=4 expires=30 -//! -> OBJECT id=0 sequence=70 priority=4 expires=30 -//! -> OBJECT id=0 sequence=70 priority=4 expires=30 -//! <- SUBSCRIBE_STOP id=0 -//! -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer" -//! ``` -mod announce; -mod announce_ok; -mod announce_reset; -mod go_away; -mod subscribe; -mod subscribe_error; -mod subscribe_fin; -mod subscribe_ok; -mod subscribe_reset; -mod unannounce; -mod unsubscribe; - -pub use announce::*; -pub use announce_ok::*; -pub use announce_reset::*; -pub use go_away::*; -pub use subscribe::*; -pub use subscribe_error::*; -pub use subscribe_fin::*; -pub use subscribe_ok::*; -pub use subscribe_reset::*; -pub use unannounce::*; -pub use unsubscribe::*; - use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; use std::fmt; +use super::{ + Announce, AnnounceError, AnnounceOk, GoAway, Subscribe, SubscribeDone, SubscribeError, SubscribeOk, Unannounce, + Unsubscribe, +}; + // Use a macro to generate the message types rather than copy-paste. // This implements a decode/encode method that uses the specified type. macro_rules! message_types { @@ -138,8 +87,7 @@ message_types! { // SUBSCRIBE family, sent by publisher SubscribeOk = 0x4, SubscribeError = 0x5, - SubscribeFin = 0xb, - SubscribeReset = 0xc, + SubscribeDone = 0xb, // ANNOUNCE family, sent by publisher Announce = 0x6, diff --git a/moq-transport/src/control/mod.rs b/moq-transport/src/control/mod.rs new file mode 100644 index 00000000..200589ea --- /dev/null +++ b/moq-transport/src/control/mod.rs @@ -0,0 +1,56 @@ +//! Low-level message sent over the wire, as defined in the specification. +//! +//! TODO Update this +//! All of these messages are sent over a bidirectional QUIC stream. +//! This introduces some head-of-line blocking but preserves ordering. +//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams. +//! +//! Messages sent by the publisher: +//! - [Announce] +//! - [Unannounce] +//! - [SubscribeOk] +//! - [SubscribeError] +//! - [SubscribeReset] +//! - [Object] +//! +//! Messages sent by the subscriber: +//! - [Subscribe] +//! - [Unsubscribe] +//! - [AnnounceOk] +//! - [AnnounceError] +//! +//! Example flow: +//! ```test +//! -> ANNOUNCE namespace="foo" +//! <- ANNOUNCE_OK namespace="foo" +//! <- SUBSCRIBE id=0 namespace="foo" name="bar" +//! -> SUBSCRIBE_OK id=0 +//! -> OBJECT id=0 sequence=69 priority=4 expires=30 +//! -> OBJECT id=0 sequence=70 priority=4 expires=30 +//! -> OBJECT id=0 sequence=70 priority=4 expires=30 +//! <- SUBSCRIBE_STOP id=0 +//! -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer" +//! ``` +mod announce; +mod announce_error; +mod announce_ok; +mod go_away; +mod message; +mod subscribe; +mod subscribe_done; +mod subscribe_error; +mod subscribe_ok; +mod unannounce; +mod unsubscribe; + +pub use announce::*; +pub use announce_error::*; +pub use announce_ok::*; +pub use go_away::*; +pub use message::*; +pub use subscribe::*; +pub use subscribe_done::*; +pub use subscribe_error::*; +pub use subscribe_ok::*; +pub use unannounce::*; +pub use unsubscribe::*; diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/control/subscribe.rs similarity index 100% rename from moq-transport/src/message/subscribe.rs rename to moq-transport/src/control/subscribe.rs diff --git a/moq-transport/src/control/subscribe_done.rs b/moq-transport/src/control/subscribe_done.rs new file mode 100644 index 00000000..7529536a --- /dev/null +++ b/moq-transport/src/control/subscribe_done.rs @@ -0,0 +1,51 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +/// Sent by the publisher to cleanly terminate a Subscribe. +#[derive(Clone, Debug)] +pub struct SubscribeDone { + /// The ID for this subscription. + pub id: VarInt, + + /// The error code + pub code: VarInt, + + /// An optional error reason + pub reason: String, + + /// The final group/object sent on this subscription. + pub last: Option<(VarInt, VarInt)>, +} + +impl SubscribeDone { + pub async fn decode(r: &mut R) -> Result { + let id = VarInt::decode(r).await?; + let code = VarInt::decode(r).await?; + let reason = String::decode(r).await?; + let last = match r.read_u8().await? { + 0 => None, + 1 => Some((VarInt::decode(r).await?, VarInt::decode(r).await?)), + _ => return Err(DecodeError::InvalidValue), + }; + + Ok(Self { id, code, reason, last }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.id.encode(w).await?; + self.code.encode(w).await?; + self.reason.encode(w).await?; + + if let Some((group, object)) = self.last { + w.write_u8(1).await?; + group.encode(w).await?; + object.encode(w).await?; + } else { + w.write_u8(0).await?; + } + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_error.rs b/moq-transport/src/control/subscribe_error.rs similarity index 88% rename from moq-transport/src/message/subscribe_error.rs rename to moq-transport/src/control/subscribe_error.rs index 3577ae5c..fa2f98c1 100644 --- a/moq-transport/src/message/subscribe_error.rs +++ b/moq-transport/src/control/subscribe_error.rs @@ -8,7 +8,7 @@ pub struct SubscribeError { pub id: VarInt, // An error code. - pub code: u32, + pub code: VarInt, // An optional, human-readable reason. pub reason: String, @@ -20,7 +20,7 @@ pub struct SubscribeError { impl SubscribeError { pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - let code = VarInt::decode(r).await?.try_into()?; + let code = VarInt::decode(r).await?; let reason = String::decode(r).await?; let alias = VarInt::decode(r).await?; @@ -34,7 +34,7 @@ impl SubscribeError { pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - VarInt::from_u32(self.code).encode(w).await?; + self.code.encode(w).await?; self.reason.encode(w).await?; self.alias.encode(w).await?; diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/control/subscribe_ok.rs similarity index 60% rename from moq-transport/src/message/subscribe_ok.rs rename to moq-transport/src/control/subscribe_ok.rs index aac07a70..2a140c3c 100644 --- a/moq-transport/src/message/subscribe_ok.rs +++ b/moq-transport/src/control/subscribe_ok.rs @@ -1,3 +1,5 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; @@ -10,6 +12,9 @@ pub struct SubscribeOk { /// The subscription will expire in this many milliseconds. pub expires: Option, + + /// The latest group and object for the track. + pub latest: Option<(VarInt, VarInt)>, } impl SubscribeOk { @@ -19,7 +24,14 @@ impl SubscribeOk { VarInt::ZERO => None, expires => Some(expires), }; - Ok(Self { id, expires }) + + let latest = match r.read_u8().await? { + 0 => None, + 1 => Some((VarInt::decode(r).await?, VarInt::decode(r).await?)), + _ => return Err(DecodeError::InvalidValue), + }; + + Ok(Self { id, expires, latest }) } } @@ -27,6 +39,18 @@ impl SubscribeOk { pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; self.expires.unwrap_or(VarInt::ZERO).encode(w).await?; + + match self.latest { + Some((group, object)) => { + w.write_u8(1).await?; + group.encode(w).await?; + object.encode(w).await?; + } + None => { + w.write_u8(0).await?; + } + } + Ok(()) } } diff --git a/moq-transport/src/message/unannounce.rs b/moq-transport/src/control/unannounce.rs similarity index 100% rename from moq-transport/src/message/unannounce.rs rename to moq-transport/src/control/unannounce.rs diff --git a/moq-transport/src/message/unsubscribe.rs b/moq-transport/src/control/unsubscribe.rs similarity index 100% rename from moq-transport/src/message/unsubscribe.rs rename to moq-transport/src/control/unsubscribe.rs diff --git a/moq-transport/src/data/datagram.rs b/moq-transport/src/data/datagram.rs new file mode 100644 index 00000000..9d849ff5 --- /dev/null +++ b/moq-transport/src/data/datagram.rs @@ -0,0 +1,41 @@ +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +#[derive(Clone, Debug)] +pub struct Datagram { + // The subscribe ID. + pub subscribe_id: VarInt, + + // The track alias. + pub track_alias: VarInt, + + // The sequence number within the track. + pub group_id: VarInt, + + // The object ID within the group. + pub object_id: VarInt, + + // The priority, where **smaller** values are sent first. + pub send_order: VarInt, +} + +impl Datagram { + pub async fn decode(r: &mut R) -> Result { + Ok(Self { + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + group_id: VarInt::decode(r).await?, + object_id: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.group_id.encode(w).await?; + self.object_id.encode(w).await?; + self.send_order.encode(w).await?; + Ok(()) + } +} diff --git a/moq-transport/src/data/group.rs b/moq-transport/src/data/group.rs index 5a7ceacd..3d2447d4 100644 --- a/moq-transport/src/data/group.rs +++ b/moq-transport/src/data/group.rs @@ -3,38 +3,33 @@ use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeEr #[derive(Clone, Debug)] pub struct Group { // The subscribe ID. - pub subscribe: VarInt, + pub subscribe_id: VarInt, // The track alias. - pub track: VarInt, + pub track_alias: VarInt, // The group sequence number - pub group: VarInt, + pub group_id: VarInt, // The priority, where **smaller** values are sent first. - pub priority: u32, + pub send_order: VarInt, } impl Group { pub async fn decode(r: &mut R) -> Result { - let subscribe = VarInt::decode(r).await?; - let track = VarInt::decode(r).await?; - let group = VarInt::decode(r).await?; - let priority = VarInt::decode(r).await?.try_into()?; - Ok(Self { - subscribe, - track, - group, - priority, + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + group_id: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.subscribe.encode(w).await?; - self.track.encode(w).await?; - self.group.encode(w).await?; - VarInt::from_u32(self.priority).encode(w).await?; + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.group_id.encode(w).await?; + self.send_order.encode(w).await?; Ok(()) } @@ -42,20 +37,20 @@ impl Group { #[derive(Clone, Debug)] pub struct GroupChunk { - pub object: VarInt, + pub object_id: VarInt, pub size: VarInt, } impl GroupChunk { pub async fn decode(r: &mut R) -> Result { - let object = VarInt::decode(r).await?; - let size = VarInt::decode(r).await?; - - Ok(Self { object, size }) + Ok(Self { + object_id: VarInt::decode(r).await?, + size: VarInt::decode(r).await?, + }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.object.encode(w).await?; + self.object_id.encode(w).await?; self.size.encode(w).await?; Ok(()) diff --git a/moq-transport/src/data/header.rs b/moq-transport/src/data/header.rs index aedfc91d..e3f64d1c 100644 --- a/moq-transport/src/data/header.rs +++ b/moq-transport/src/data/header.rs @@ -1,7 +1,7 @@ use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; use std::fmt; -use super::{Group, Object, Track}; +use super::{Datagram, Group, Object, Track}; // Use a macro to generate the message types rather than copy-paste. // This implements a decode/encode method that uses the specified type. @@ -51,21 +51,21 @@ macro_rules! header_types { } } - pub fn subscribe(&self) -> VarInt { + pub fn subscribe_id(&self) -> VarInt { match self { - $(Self::$name(o) => o.subscribe,)* + $(Self::$name(o) => o.subscribe_id,)* } } - pub fn track(&self) -> VarInt { + pub fn track_alias(&self) -> VarInt { match self { - $(Self::$name(o) => o.track,)* + $(Self::$name(o) => o.track_alias,)* } } - pub fn priority(&self) -> u32 { + pub fn send_order(&self) -> VarInt { match self { - $(Self::$name(o) => o.priority,)* + $(Self::$name(o) => o.send_order,)* } } } @@ -90,6 +90,7 @@ macro_rules! header_types { // Each object type is prefixed with the given VarInt type. header_types! { Object = 0x0, + Datagram = 0x1, Group = 0x50, Track = 0x51, } diff --git a/moq-transport/src/data/mod.rs b/moq-transport/src/data/mod.rs index b820bdcf..2090093c 100644 --- a/moq-transport/src/data/mod.rs +++ b/moq-transport/src/data/mod.rs @@ -1,8 +1,10 @@ +mod datagram; mod group; mod header; mod object; mod track; +pub use datagram::*; pub use group::*; pub use header::*; pub use object::*; diff --git a/moq-transport/src/data/object.rs b/moq-transport/src/data/object.rs index f4196a34..bd0f8540 100644 --- a/moq-transport/src/data/object.rs +++ b/moq-transport/src/data/object.rs @@ -4,44 +4,38 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; #[derive(Clone, Debug)] pub struct Object { // The subscribe ID. - pub subscribe: VarInt, + pub subscribe_id: VarInt, // The track alias. - pub track: VarInt, + pub track_alias: VarInt, // The sequence number within the track. - pub group: VarInt, + pub group_id: VarInt, // The sequence number within the group. - pub sequence: VarInt, + pub object_id: VarInt, - // The priority, where **smaller** values are sent first. - pub priority: u32, + // The send order, where **smaller** values are sent first. + pub send_order: VarInt, } impl Object { pub async fn decode(r: &mut R) -> Result { - let subscribe = VarInt::decode(r).await?; - let track = VarInt::decode(r).await?; - let group = VarInt::decode(r).await?; - let sequence = VarInt::decode(r).await?; - let priority = VarInt::decode(r).await?.try_into()?; - Ok(Self { - subscribe, - track, - group, - sequence, - priority, + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + group_id: VarInt::decode(r).await?, + object_id: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.subscribe.encode(w).await?; - self.track.encode(w).await?; - self.group.encode(w).await?; - self.sequence.encode(w).await?; - VarInt::from_u32(self.priority).encode(w).await?; + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.group_id.encode(w).await?; + self.object_id.encode(w).await?; + self.send_order.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/data/track.rs b/moq-transport/src/data/track.rs index 0547a833..95dd9ddd 100644 --- a/moq-transport/src/data/track.rs +++ b/moq-transport/src/data/track.rs @@ -4,32 +4,28 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; #[derive(Clone, Debug)] pub struct Track { // The subscribe ID. - pub subscribe: VarInt, + pub subscribe_id: VarInt, // The track ID. - pub track: VarInt, + pub track_alias: VarInt, // The priority, where **smaller** values are sent first. - pub priority: u32, + pub send_order: VarInt, } impl Track { pub async fn decode(r: &mut R) -> Result { - let subscribe = VarInt::decode(r).await?; - let track = VarInt::decode(r).await?; - let priority = VarInt::decode(r).await?.try_into()?; - Ok(Self { - subscribe, - track, - priority, + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.subscribe.encode(w).await?; - self.track.encode(w).await?; - VarInt::from_u32(self.priority).encode(w).await?; + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.send_order.encode(w).await?; Ok(()) } @@ -37,23 +33,27 @@ impl Track { #[derive(Clone, Debug)] pub struct TrackChunk { - pub group: VarInt, - pub object: VarInt, + pub group_id: VarInt, + pub object_id: VarInt, pub size: VarInt, } impl TrackChunk { pub async fn decode(r: &mut R) -> Result { - let group = VarInt::decode(r).await?; - let object = VarInt::decode(r).await?; + let group_id = VarInt::decode(r).await?; + let object_id = VarInt::decode(r).await?; let size = VarInt::decode(r).await?; - Ok(Self { group, object, size }) + Ok(Self { + group_id, + object_id, + size, + }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.group.encode(w).await?; - self.object.encode(w).await?; + self.group_id.encode(w).await?; + self.object_id.encode(w).await?; self.size.encode(w).await?; Ok(()) diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 734202bd..e80deda4 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -10,8 +10,8 @@ mod coding; mod error; pub mod cache; +pub mod control; pub mod data; -pub mod message; pub mod session; pub mod setup; diff --git a/moq-transport/src/message/subscribe_fin.rs b/moq-transport/src/message/subscribe_fin.rs deleted file mode 100644 index 38a4efda..00000000 --- a/moq-transport/src/message/subscribe_fin.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -/// Sent by the publisher to cleanly terminate a Subscribe. -#[derive(Clone, Debug)] -pub struct SubscribeFin { - /// The ID for this subscription. - pub id: VarInt, - - /// The final group/object sent on this subscription. - pub final_group: VarInt, - pub final_object: VarInt, -} - -impl SubscribeFin { - pub async fn decode(r: &mut R) -> Result { - let id = VarInt::decode(r).await?; - let final_group = VarInt::decode(r).await?; - let final_object = VarInt::decode(r).await?; - - Ok(Self { - id, - final_group, - final_object, - }) - } - - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.id.encode(w).await?; - self.final_group.encode(w).await?; - self.final_object.encode(w).await?; - - Ok(()) - } -} diff --git a/moq-transport/src/message/subscribe_reset.rs b/moq-transport/src/message/subscribe_reset.rs deleted file mode 100644 index dfedf876..00000000 --- a/moq-transport/src/message/subscribe_reset.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -/// Sent by the publisher to terminate a Subscribe. -#[derive(Clone, Debug)] -pub struct SubscribeReset { - /// The ID for this subscription. - pub id: VarInt, - - /// An error code. - pub code: u32, - - /// An optional, human-readable reason. - pub reason: String, - - /// The final group/object sent on this subscription. - pub group: VarInt, - pub object: VarInt, -} - -impl SubscribeReset { - pub async fn decode(r: &mut R) -> Result { - let id = VarInt::decode(r).await?; - let code = VarInt::decode(r).await?.try_into()?; - let reason = String::decode(r).await?; - let group = VarInt::decode(r).await?; - let object = VarInt::decode(r).await?; - - Ok(Self { - id, - code, - reason, - group, - object, - }) - } - - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.id.encode(w).await?; - VarInt::from_u32(self.code).encode(w).await?; - self.reason.encode(w).await?; - - self.group.encode(w).await?; - self.object.encode(w).await?; - - Ok(()) - } -} diff --git a/moq-transport/src/session/control.rs b/moq-transport/src/session/control.rs index 7c84e805..7cfa78c5 100644 --- a/moq-transport/src/session/control.rs +++ b/moq-transport/src/session/control.rs @@ -6,7 +6,7 @@ use tokio::sync::Mutex; use webtransport_quinn::{RecvStream, SendStream}; use super::SessionError; -use crate::message::Message; +use crate::control; #[derive(Debug, Clone)] pub(crate) struct Control { @@ -22,7 +22,7 @@ impl Control { } } - pub async fn send + fmt::Debug>(&self, msg: T) -> Result<(), SessionError> { + pub async fn send + fmt::Debug>(&self, msg: T) -> Result<(), SessionError> { let mut stream = self.send.lock().await; log::info!("sending message: {:?}", msg); msg.into() @@ -33,9 +33,9 @@ impl Control { } // It's likely a mistake to call this from two different tasks, but it's easier to just support it. - pub async fn recv(&self) -> Result { + pub async fn recv(&self) -> Result { let mut stream = self.recv.lock().await; - let msg = Message::decode(&mut *stream) + let msg = control::Message::decode(&mut *stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(msg) diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index 3c49f76d..9f648076 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -56,6 +56,9 @@ pub enum SessionError { #[error("sequence numbers out of order: expected={0} actual={1}")] OutOfOrder(VarInt, VarInt), + #[error("message was used in an invalid context")] + InvalidMessage, + /// An unclassified error because I'm lazy. TODO classify these errors #[error("unknown error: {0}")] Unknown(String), @@ -80,6 +83,7 @@ impl MoqError for SessionError { Self::InvalidSize(_) => 400, Self::RequiredExtension(_) => 426, Self::BoundsExceeded(_) => 500, + Self::InvalidMessage => 400, Self::OutOfOrder(_, _) => 400, } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index fe836117..d495d7d7 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -8,9 +8,9 @@ use webtransport_quinn::Session; use crate::{ cache::{broadcast, segment, track, CacheError}, - data, message, - message::Message, - MoqError, VarInt, + control, + control::Message, + data, MoqError, VarInt, }; use super::{Control, SessionError}; @@ -90,17 +90,17 @@ impl Publisher { } } - async fn recv_announce_ok(&mut self, _msg: &message::AnnounceOk) -> Result<(), SessionError> { + async fn recv_announce_ok(&mut self, _msg: &control::AnnounceOk) -> Result<(), SessionError> { // We didn't send an announce. Err(CacheError::NotFound.into()) } - async fn recv_announce_error(&mut self, _msg: &message::AnnounceError) -> Result<(), SessionError> { + async fn recv_announce_error(&mut self, _msg: &control::AnnounceError) -> Result<(), SessionError> { // We didn't send an announce. Err(CacheError::NotFound.into()) } - async fn recv_subscribe(&mut self, msg: &message::Subscribe) -> Result<(), SessionError> { + async fn recv_subscribe(&mut self, msg: &control::Subscribe) -> Result<(), SessionError> { // Assume that the subscribe ID is unique for now. let abort = match self.start_subscribe(msg.clone()) { Ok(abort) => abort, @@ -114,29 +114,30 @@ impl Publisher { }; self.control - .send(message::SubscribeOk { + .send(control::SubscribeOk { id: msg.id, expires: None, + + // TODO implement this + latest: None, }) .await } async fn reset_subscribe(&mut self, id: VarInt, err: E) -> Result<(), SessionError> { - let msg = message::SubscribeReset { + let msg = control::SubscribeDone { id, - code: err.code(), + code: err.code().into(), reason: err.to_string(), - // TODO properly populate these - // But first: https://github.com/moq-wg/moq-transport/issues/313 - group: VarInt::ZERO, - object: VarInt::ZERO, + // TODO properly populate this + last: None, }; self.control.send(msg).await } - fn start_subscribe(&mut self, msg: message::Subscribe) -> Result { + fn start_subscribe(&mut self, msg: control::Subscribe) -> Result { let mut track = self.source.get_track(&msg.track_name)?; // TODO only clone the fields we need @@ -180,12 +181,12 @@ impl Publisher { async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> { let header = data::Group { - subscribe: id, - track: id, + subscribe_id: id, + track_alias: id, // Properties of the segment - group: segment.sequence, - priority: segment.priority, + group_id: segment.sequence, + send_order: VarInt::from_u32(segment.priority), }; log::trace!("sending stream: {:?}", header); @@ -202,7 +203,7 @@ impl Publisher { while let Some(mut fragment) = segment.fragment().await? { let object = data::GroupChunk { - object: fragment.sequence, + object_id: fragment.sequence, size: VarInt::try_from(fragment.size)?, }; @@ -222,7 +223,7 @@ impl Publisher { Ok(()) } - async fn recv_unsubscribe(&mut self, msg: &message::Unsubscribe) -> Result<(), SessionError> { + async fn recv_unsubscribe(&mut self, msg: &control::Unsubscribe) -> Result<(), SessionError> { let abort = self .subscribes .lock() diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index ac1c7b91..00b9f799 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -8,8 +8,9 @@ use std::{ use crate::{ cache::{broadcast, segment, track, CacheError}, coding::DecodeError, - data, message, - message::Message, + control, + control::Message, + data, session::{Control, SessionError}, VarInt, }; @@ -74,9 +75,8 @@ impl Subscriber { Message::Announce(_) => Ok(()), // don't care Message::Unannounce(_) => Ok(()), // also don't care Message::SubscribeOk(_msg) => Ok(()), // don't care - Message::SubscribeReset(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)), - Message::SubscribeFin(msg) => self.recv_subscribe_error(msg.id, CacheError::Closed), - Message::SubscribeError(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)), + Message::SubscribeDone(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code.try_into()?)), + Message::SubscribeError(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code.try_into()?)), Message::GoAway(_msg) => unimplemented!("GOAWAY"), _ => Err(SessionError::RoleViolation(msg.id())), } @@ -116,6 +116,7 @@ impl Subscriber { data::Header::Track(header) => self.run_track(header, stream).await, data::Header::Group(header) => self.run_group(header, stream).await, data::Header::Object(header) => self.run_object(header, stream).await, + data::Header::Datagram(_) => Err(SessionError::InvalidMessage), } } @@ -136,11 +137,11 @@ impl Subscriber { // TODO error if we get a duplicate group let mut segment = { let mut subscribes = self.subscribes.lock().unwrap(); - let track = subscribes.get_mut(&header.subscribe).ok_or(CacheError::NotFound)?; + let track = subscribes.get_mut(&header.subscribe_id).ok_or(CacheError::NotFound)?; track.create_segment(segment::Info { - sequence: chunk.group, - priority: header.priority, + sequence: chunk.group_id, + priority: header.send_order.try_into()?, // TODO support u64 priorities })? }; @@ -168,11 +169,11 @@ impl Subscriber { async fn run_group(self, header: data::Group, mut stream: RecvStream) -> Result<(), SessionError> { let mut segment = { let mut subscribes = self.subscribes.lock().unwrap(); - let track = subscribes.get_mut(&header.subscribe).ok_or(CacheError::NotFound)?; + let track = subscribes.get_mut(&header.subscribe_id).ok_or(CacheError::NotFound)?; track.create_segment(segment::Info { - sequence: header.group, - priority: header.priority, + sequence: header.group_id, + priority: header.send_order.try_into()?, // TODO support u64 priorities })? }; @@ -193,11 +194,11 @@ impl Subscriber { log::trace!("receiving chunk: {:?}", chunk); // NOTE: We allow gaps, but not going backwards. - if chunk.object.into_inner() < expected { - return Err(SessionError::OutOfOrder(expected.try_into()?, chunk.object)); + if chunk.object_id.into_inner() < expected { + return Err(SessionError::OutOfOrder(expected.try_into()?, chunk.object_id)); } - expected = chunk.object.into_inner() + 1; + expected = chunk.object_id.into_inner() + 1; let mut remain = chunk.size.into(); @@ -230,7 +231,7 @@ impl Subscriber { let id = VarInt::from_u32(self.next.fetch_add(1, atomic::Ordering::SeqCst)); self.subscribes.lock().unwrap().insert(id, track); - let msg = message::Subscribe { + let msg = control::Subscribe { id, track_alias: id, // This alias is useless but part of the spec. @@ -238,10 +239,10 @@ impl Subscriber { track_name, // TODO correctly support these - start_group: message::SubscribeLocation::Latest(VarInt::ZERO), - start_object: message::SubscribeLocation::Absolute(VarInt::ZERO), - end_group: message::SubscribeLocation::None, - end_object: message::SubscribeLocation::None, + start_group: control::SubscribeLocation::Latest(VarInt::ZERO), + start_object: control::SubscribeLocation::Absolute(VarInt::ZERO), + end_group: control::SubscribeLocation::None, + end_object: control::SubscribeLocation::None, params: Default::default(), }; diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/setup/version.rs index 3a3fccf6..03460d3f 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/setup/version.rs @@ -15,65 +15,11 @@ impl Version { /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html pub const DRAFT_01: Version = Version(VarInt::from_u32(0xff000001)); - /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-02.html pub const DRAFT_02: Version = Version(VarInt::from_u32(0xff000002)); - /// Fork of draft-ietf-moq-transport-00. - /// - /// Rough list of differences: - /// - /// # Messages - /// - Messages are sent over a control stream or a data stream. - /// - Data streams: each unidirectional stream contains a single OBJECT message. - /// - Control stream: a (client-initiated) bidirectional stream containing SETUP and then all other messages. - /// - Messages do not contain a length; unknown messages are fatal. - /// - /// # SETUP - /// - SETUP is split into SETUP_CLIENT and SETUP_SERVER with separate IDs. - /// - SETUP uses version `0xff00` for draft-00. - /// - SETUP no longer contains optional parameters; all are encoded in order and possibly zero. - /// - SETUP `role` indicates the role of the sender, not the role of the server. - /// - SETUP `path` field removed; use WebTransport for path. - /// - /// # SUBSCRIBE - /// - SUBSCRIBE `full_name` is split into separate `namespace` and `name` fields. - /// - SUBSCRIBE no longer contains optional parameters; all are encoded in order and possibly zero. - /// - SUBSCRIBE no longer contains the `auth` parameter; use WebTransport for auth. - /// - SUBSCRIBE no longer contains the `group` parameter; concept no longer exists. - /// - SUBSCRIBE contains the `id` instead of SUBSCRIBE_OK. - /// - SUBSCRIBE_OK and SUBSCRIBE_ERROR reference the subscription `id` the instead of the track `full_name`. - /// - SUBSCRIBE_ERROR was renamed to SUBSCRIBE_RESET, sent by publisher to terminate a SUBSCRIBE. - /// - SUBSCRIBE_STOP was added, sent by the subscriber to terminate a SUBSCRIBE. - /// - SUBSCRIBE_OK no longer has `expires`. - /// - /// # ANNOUNCE - /// - ANNOUNCE no longer contains optional parameters; all are encoded in order and possibly zero. - /// - ANNOUNCE no longer contains the `auth` field; use WebTransport for auth. - /// - ANNOUNCE_ERROR was renamed to ANNOUNCE_RESET, sent by publisher to terminate an ANNOUNCE. - /// - ANNOUNCE_STOP was added, sent by the subscriber to terminate an ANNOUNCE. - /// - /// # OBJECT - /// - OBJECT uses a dedicated QUIC stream. - /// - OBJECT has no size and continues until stream FIN. - /// - OBJECT `priority` is a i32 instead of a varint. (for practical reasons) - /// - OBJECT `expires` was added, a varint in seconds. - /// - OBJECT `group` was removed. - /// - /// # GROUP - /// - GROUP concept was removed, replaced with OBJECT as a QUIC stream. - pub const KIXEL_00: Version = Version(VarInt::from_u32(0xbad00)); - - /// Fork of draft-ietf-moq-transport-01. - /// - /// Most of the KIXEL_00 changes made it into the draft, or were reverted. - /// This was only used for a short time until extensions were created. - /// - /// - SUBSCRIBE contains a separate track namespace and track name field (accidental revert). [#277](https://github.com/moq-wg/moq-transport/pull/277) - /// - SUBSCRIBE contains the `track_id` instead of SUBSCRIBE_OK. [#145](https://github.com/moq-wg/moq-transport/issues/145) - /// - SUBSCRIBE_* reference `track_id` the instead of the `track_full_name`. [#145](https://github.com/moq-wg/moq-transport/issues/145) - /// - OBJECT `priority` is still a VarInt, but the max value is a u32 (implementation reasons) - /// - OBJECT messages within the same `group` MUST be on the same QUIC stream. - pub const KIXEL_01: Version = Version(VarInt::from_u32(0xbad01)); + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-03.html + pub const DRAFT_03: Version = Version(VarInt::from_u32(0xff000003)); } impl From for Version { From 54fe1fc8f9092a4102104d2308da7ea5fdf4ee26 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 8 Mar 2024 14:56:14 -0800 Subject: [PATCH 6/6] Remove unused imports. --- Cargo.lock | 35 ++--------------------------------- moq-pub/Cargo.toml | 1 - moq-transport/Cargo.toml | 2 -- 3 files changed, 2 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 31cb67a9..5a1ed8dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -529,12 +529,6 @@ dependencies = [ "termcolor", ] -[[package]] -name = "equivalent" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" - [[package]] name = "errno" version = "0.3.3" @@ -758,7 +752,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 1.9.3", + "indexmap", "slab", "tokio", "tokio-util", @@ -771,12 +765,6 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -[[package]] -name = "hashbrown" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" - [[package]] name = "heck" version = "0.4.1" @@ -941,17 +929,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown 0.12.3", -] - -[[package]] -name = "indexmap" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" -dependencies = [ - "equivalent", - "hashbrown 0.14.0", + "hashbrown", ] [[package]] @@ -1130,7 +1108,6 @@ name = "moq-pub" version = "0.1.0" dependencies = [ "anyhow", - "bytes", "clap", "clap_mangen", "env_logger", @@ -1187,10 +1164,8 @@ dependencies = [ "bytes", "clap", "env_logger", - "indexmap 2.0.0", "log", "mp4", - "paste", "quinn", "rfc6381-codec", "rustls", @@ -1409,12 +1384,6 @@ dependencies = [ "windows-targets", ] -[[package]] -name = "paste" -version = "1.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" - [[package]] name = "percent-encoding" version = "2.3.0" diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index 0618c498..0d70ca82 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -21,7 +21,6 @@ quinn = "0.10" webtransport-quinn = "0.6.1" #webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } url = "2" -bytes = "1.5" # Crypto rustls = { version = "0.21", features = ["dangerous_configuration"] } diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 6f5f050c..c0f122f7 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -19,14 +19,12 @@ bytes = "1" thiserror = "1" tokio = { version = "1", features = ["macros", "io-util", "sync"] } log = "0.4" -indexmap = "2" quinn = "0.10" webtransport-quinn = "0.6.1" #webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } async-trait = "0.1" -paste = "1" [dev-dependencies] # QUIC