diff --git a/Cargo.lock b/Cargo.lock index ecf21c37..5386664c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1170,10 +1170,8 @@ dependencies = [ "bytes", "clap", "env_logger", - "indexmap", "log", "mp4", - "paste", "quinn", "rfc6381-codec", "rustls", @@ -1392,12 +1390,6 @@ dependencies = [ "windows-targets", ] -[[package]] -name = "paste" -version = "1.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" - [[package]] name = "percent-encoding" version = "2.3.0" diff --git a/dev/pub b/dev/pub index 2a5cb211..f6b36950 100755 --- a/dev/pub +++ b/dev/pub @@ -15,9 +15,9 @@ ADDR="${ADDR:-$HOST:$PORT}" # Generate a random 16 character name by default. #NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}" -# JK use the name "dev" instead +# JK use the name "bbb" instead, matching the Big Buck Bunny demo. # TODO use that random name if the host is not localhost -NAME="${NAME:-dev}" +NAME="${NAME:-bbb}" # Combine the host and name into a URL. URL="${URL:-"https://$ADDR/$NAME"}" diff --git a/moq-clock/src/clock.rs b/moq-clock/src/clock.rs index a72a0cff..bbf16219 100644 --- a/moq-clock/src/clock.rs +++ b/moq-clock/src/clock.rs @@ -1,5 +1,3 @@ -use std::time; - use anyhow::Context; use moq_transport::{ cache::{fragment, segment, track}, @@ -30,7 +28,6 @@ impl Publisher { .create_segment(segment::Info { sequence: VarInt::from_u32(sequence), priority: 0, - expires: Some(time::Duration::from_secs(60)), }) .context("failed to create minute segment")?; @@ -56,19 +53,11 @@ impl Publisher { // Everything but the second. let base = now.format("%Y-%m-%d %H:%M:").to_string(); - segment - .fragment(VarInt::ZERO, base.len())? - .chunk(base.clone().into()) - .context("failed to write base")?; + segment.write(base.clone().into()).context("failed to write base")?; loop { let delta = now.format("%S").to_string(); - let sequence = VarInt::from_u32(now.second() + 1); - - segment - .fragment(sequence, delta.len())? - .chunk(delta.clone().into()) - .context("failed to write delta")?; + segment.write(delta.clone().into()).context("failed to write delta")?; println!("{}{}", base, delta); @@ -119,7 +108,7 @@ impl Subscriber { log::debug!("got first: {:?}", first); - if first.sequence.into_inner() != 0 { + if first.sequence != VarInt::ZERO { anyhow::bail!("first object must be zero; I'm not going to implement a reassembly buffer"); } diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index 38335849..bfbe84ce 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -1,5 +1,5 @@ use anyhow::{self, Context}; -use moq_transport::cache::{broadcast, fragment, segment, track}; +use moq_transport::cache::{broadcast, segment, track}; use moq_transport::VarInt; use mp4::{self, ReadBox}; use serde_json::json; @@ -41,16 +41,13 @@ impl Media { // Create the catalog track with a single segment. let mut init_track = broadcast.create_track("0.mp4")?; - let init_segment = init_track.create_segment(segment::Info { + let mut init_segment = init_track.create_segment(segment::Info { sequence: VarInt::ZERO, priority: 0, - expires: None, })?; - // Create a single fragment, optionally setting the size - let mut init_fragment = init_segment.final_fragment(VarInt::ZERO)?; - - init_fragment.chunk(init.into())?; + // Write the init segment to the track. + init_segment.write(init.into())?; let mut tracks = HashMap::new(); @@ -128,10 +125,9 @@ impl Media { init_track_name: &str, moov: &mp4::MoovBox, ) -> Result<(), anyhow::Error> { - let segment = track.create_segment(segment::Info { + let mut segment = track.create_segment(segment::Info { sequence: VarInt::ZERO, priority: 0, - expires: None, })?; let mut tracks = Vec::new(); @@ -214,10 +210,7 @@ impl Media { log::info!("catalog: {}", catalog_str); // Create a single fragment for the segment. - let mut fragment = segment.final_fragment(VarInt::ZERO)?; - - // Add the segment and add the fragment. - fragment.chunk(catalog_str.into())?; + segment.write(catalog_str.into())?; Ok(()) } @@ -265,7 +258,7 @@ struct Track { track: track::Publisher, // The current segment - current: Option, + current: Option, // The number of units per second. timescale: u64, @@ -288,7 +281,7 @@ impl Track { if let Some(current) = self.current.as_mut() { if !fragment.keyframe { // Use the existing segment - current.chunk(raw.into())?; + current.write(raw.into())?; return Ok(()); } } @@ -304,33 +297,27 @@ impl Track { .context("timestamp too large")?; // Create a new segment. - let segment = self.track.create_segment(segment::Info { + let mut segment = self.track.create_segment(segment::Info { sequence: VarInt::try_from(self.sequence).context("sequence too large")?, // Newer segments are higher priority priority: u32::MAX.checked_sub(timestamp).context("priority too large")?, - - // Delete segments after 10s. - expires: Some(time::Duration::from_secs(10)), })?; - // Create a single fragment for the segment that we will keep appending. - let mut fragment = segment.final_fragment(VarInt::ZERO)?; - self.sequence += 1; - // Insert the raw atom into the segment. - fragment.chunk(raw.into())?; + // Create a single fragment for the segment that we will keep appending. + segment.write(raw.into())?; // Save for the next iteration - self.current = Some(fragment); + self.current = Some(segment); Ok(()) } pub fn data(&mut self, raw: Vec) -> anyhow::Result<()> { - let fragment = self.current.as_mut().context("missing current fragment")?; - fragment.chunk(raw.into())?; + let segment = self.current.as_mut().context("missing current fragment")?; + segment.write(raw.into())?; Ok(()) } diff --git a/moq-relay/src/error.rs b/moq-relay/src/error.rs index b943a938..ab64aac2 100644 --- a/moq-relay/src/error.rs +++ b/moq-relay/src/error.rs @@ -36,16 +36,4 @@ impl moq_transport::MoqError for RelayError { Self::WebTransportServer(_) => 500, } } - - fn reason(&self) -> String { - match self { - Self::Transport(err) => format!("transport error: {}", err.reason()), - Self::Cache(err) => format!("cache error: {}", err.reason()), - Self::MoqApi(err) => format!("api error: {}", err), - Self::Url(err) => format!("url error: {}", err), - Self::MissingNode => "missing node".to_owned(), - Self::WebTransportServer(err) => format!("upstream server error: {}", err), - Self::WebTransportClient(err) => format!("upstream client error: {}", err), - } - } } diff --git a/moq-relay/src/session.rs b/moq-relay/src/session.rs index a7570f7d..35234516 100644 --- a/moq-relay/src/session.rs +++ b/moq-relay/src/session.rs @@ -55,12 +55,12 @@ impl Session { match role { Role::Publisher => { if let Err(err) = self.serve_publisher(id, request, &path).await { - log::warn!("error serving publisher: id={} path={} err={:#?}", id, path, err); + log::warn!("error serving publisher: id={} path={} err={}", id, path, err); } } Role::Subscriber => { if let Err(err) = self.serve_subscriber(id, request, &path).await { - log::warn!("error serving subscriber: id={} path={} err={:#?}", id, path, err); + log::warn!("error serving subscriber: id={} path={} err={}", id, path, err); } } Role::Both => { diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 6f5f050c..c0f122f7 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -19,14 +19,12 @@ bytes = "1" thiserror = "1" tokio = { version = "1", features = ["macros", "io-util", "sync"] } log = "0.4" -indexmap = "2" quinn = "0.10" webtransport-quinn = "0.6.1" #webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } async-trait = "0.1" -paste = "1" [dev-dependencies] # QUIC diff --git a/moq-transport/src/cache/error.rs b/moq-transport/src/cache/error.rs index d3f907b6..dfe84f6e 100644 --- a/moq-transport/src/cache/error.rs +++ b/moq-transport/src/cache/error.rs @@ -9,7 +9,7 @@ pub enum CacheError { #[error("closed")] Closed, - /// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher. + /// A SUBSCRIBE_DONE or ANNOUNCE_CANCEL was received. #[error("reset code={0:?}")] Reset(u32), @@ -24,6 +24,10 @@ pub enum CacheError { /// A resource already exists with that ID. #[error("duplicate")] Duplicate, + + /// We reported the wrong size for a fragment. + #[error("wrong size")] + WrongSize, } impl MoqError for CacheError { @@ -35,17 +39,7 @@ impl MoqError for CacheError { Self::Stop => 206, Self::NotFound => 404, Self::Duplicate => 409, - } - } - - /// A reason that is sent over the wire. - fn reason(&self) -> String { - match self { - Self::Closed => "closed".to_owned(), - Self::Reset(code) => format!("reset code: {}", code), - Self::Stop => "stop".to_owned(), - Self::NotFound => "not found".to_owned(), - Self::Duplicate => "duplicate".to_owned(), + Self::WrongSize => 500, } } } diff --git a/moq-transport/src/cache/fragment.rs b/moq-transport/src/cache/fragment.rs index 4e08333f..ccdc9049 100644 --- a/moq-transport/src/cache/fragment.rs +++ b/moq-transport/src/cache/fragment.rs @@ -34,9 +34,8 @@ pub struct Info { // NOTE: These may be received out of order or with gaps. pub sequence: VarInt, - // The size of the fragment, optionally None if this is the last fragment in a segment. - // TODO enforce this size. - pub size: Option, + // The size of the fragment. + pub size: usize, } struct State { @@ -79,6 +78,9 @@ pub struct Publisher { // Immutable segment state. info: Arc, + // The amount of promised data that has yet to be written. + remain: usize, + // Closes the segment when all Publishers are dropped. _dropped: Arc, } @@ -86,20 +88,38 @@ pub struct Publisher { impl Publisher { fn new(state: Watch, info: Arc) -> Self { let _dropped = Arc::new(Dropped::new(state.clone())); - Self { state, info, _dropped } + let remain = info.size; + + Self { + state, + info, + remain, + _dropped, + } } /// Write a new chunk of bytes. pub fn chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> { + if chunk.len() > self.remain { + return Err(CacheError::WrongSize); + } + self.remain -= chunk.len(); + let mut state = self.state.lock_mut(); state.closed.clone()?; state.chunks.push(chunk); + Ok(()) } /// Close the segment with an error. pub fn close(self, err: CacheError) -> Result<(), CacheError> { - self.state.lock_mut().close(err) + self.state.lock_mut().close(err)?; + if self.remain != 0 { + Err(CacheError::WrongSize) + } else { + Ok(()) + } } } diff --git a/moq-transport/src/cache/segment.rs b/moq-transport/src/cache/segment.rs index ecd27dea..cb396dbb 100644 --- a/moq-transport/src/cache/segment.rs +++ b/moq-transport/src/cache/segment.rs @@ -8,7 +8,7 @@ //! //! The segment is closed with [CacheError::Closed] when all publishers or subscribers are dropped. use core::fmt; -use std::{ops::Deref, sync::Arc, time}; +use std::{ops::Deref, sync::Arc}; use crate::VarInt; @@ -34,9 +34,6 @@ pub struct Info { // The priority of the segment within the BROADCAST. pub priority: u32, - - // Cache the segment for at most this long. - pub expires: Option, } struct State { @@ -81,6 +78,9 @@ pub struct Publisher { // Immutable segment state. info: Arc, + // The next fragment sequence number to use. + next: u64, + // Closes the segment when all Publishers are dropped. _dropped: Arc, } @@ -88,16 +88,28 @@ pub struct Publisher { impl Publisher { fn new(state: Watch, info: Arc) -> Self { let _dropped = Arc::new(Dropped::new(state.clone())); - Self { state, info, _dropped } + Self { + state, + info, + next: 0, + _dropped, + } } - // Not public because it's a footgun. - pub(crate) fn push_fragment( - &mut self, - sequence: VarInt, - size: Option, - ) -> Result { - let (publisher, subscriber) = fragment::new(fragment::Info { sequence, size }); + /// Write an object with the given payload. + pub fn write(&mut self, data: bytes::Bytes) -> Result<(), CacheError> { + self.fragment(data.len())?.chunk(data) + } + + /// Write an object over multiple fragments. + /// + /// BAD STUFF will happen if the size is wrong. + pub fn fragment(&mut self, size: usize) -> Result { + let (publisher, subscriber) = fragment::new(fragment::Info { + sequence: self.next.try_into().unwrap(), + size, + }); + self.next += 1; let mut state = self.state.lock_mut(); state.closed.clone()?; @@ -105,16 +117,6 @@ impl Publisher { Ok(publisher) } - /// Write a fragment - pub fn fragment(&mut self, sequence: VarInt, size: usize) -> Result { - self.push_fragment(sequence, Some(size)) - } - - /// Write the last fragment, which means size can be unknown. - pub fn final_fragment(mut self, sequence: VarInt) -> Result { - self.push_fragment(sequence, None) - } - /// Close the segment with an error. pub fn close(self, err: CacheError) -> Result<(), CacheError> { self.state.lock_mut().close(err) diff --git a/moq-transport/src/cache/track.rs b/moq-transport/src/cache/track.rs index 6d2d405d..f9448d97 100644 --- a/moq-transport/src/cache/track.rs +++ b/moq-transport/src/cache/track.rs @@ -12,12 +12,8 @@ //! //! The track is closed with [CacheError::Closed] when all publishers or subscribers are dropped. -use std::{collections::BinaryHeap, fmt, ops::Deref, sync::Arc, time}; - -use indexmap::IndexMap; - use super::{segment, CacheError, Watch}; -use crate::VarInt; +use std::{fmt, ops::Deref, sync::Arc}; /// Create a track with the given name. pub fn new(name: &str) -> (Publisher, Subscriber) { @@ -36,17 +32,10 @@ pub struct Info { pub name: String, } +#[derive(Debug)] struct State { - // Store segments in received order so subscribers can detect changes. - // The key is the segment sequence, which could have gaps. - // A None value means the segment has expired. - lookup: IndexMap>, - - // Store when segments will expire in a priority queue. - expires: BinaryHeap, - - // The number of None entries removed from the start of the lookup. - pruned: usize, + current: Option, + epoch: usize, // Set when the publisher is closed/dropped, or all subscribers are dropped. closed: Result<(), CacheError>, @@ -61,75 +50,22 @@ impl State { pub fn insert(&mut self, segment: segment::Subscriber) -> Result<(), CacheError> { self.closed.clone()?; - - let entry = match self.lookup.entry(segment.sequence) { - indexmap::map::Entry::Occupied(_entry) => return Err(CacheError::Duplicate), - indexmap::map::Entry::Vacant(entry) => entry, - }; - - if let Some(expires) = segment.expires { - self.expires.push(SegmentExpiration { - sequence: segment.sequence, - expires: time::Instant::now() + expires, - }); - } - - entry.insert(Some(segment)); - - // Expire any existing segments on insert. - // This means if you don't insert then you won't expire... but it's probably fine since the cache won't grow. - // TODO Use a timer to expire segments at the correct time instead - self.expire(); - + self.current = Some(segment); + self.epoch += 1; Ok(()) } - - // Try expiring any segments - pub fn expire(&mut self) { - let now = time::Instant::now(); - while let Some(segment) = self.expires.peek() { - if segment.expires > now { - break; - } - - // Update the entry to None while preserving the index. - match self.lookup.entry(segment.sequence) { - indexmap::map::Entry::Occupied(mut entry) => entry.insert(None), - indexmap::map::Entry::Vacant(_) => panic!("expired segment not found"), - }; - - self.expires.pop(); - } - - // Remove None entries from the start of the lookup. - while let Some((_, None)) = self.lookup.get_index(0) { - self.lookup.shift_remove_index(0); - self.pruned += 1; - } - } } impl Default for State { fn default() -> Self { Self { - lookup: Default::default(), - expires: Default::default(), - pruned: 0, + current: None, + epoch: 0, closed: Ok(()), } } } -impl fmt::Debug for State { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("State") - .field("lookup", &self.lookup) - .field("pruned", &self.pruned) - .field("closed", &self.closed) - .finish() - } -} - /// Creates new segments for a track. pub struct Publisher { state: Watch, @@ -183,12 +119,7 @@ impl fmt::Debug for Publisher { pub struct Subscriber { state: Watch, info: Arc, - - // The index of the next segment to return. - index: usize, - - // If there are multiple segments to return, we put them in here to return them in priority order. - pending: BinaryHeap, + epoch: usize, // Dropped when all subscribers are dropped. _dropped: Arc, @@ -200,8 +131,7 @@ impl Subscriber { Self { state, info, - index: 0, - pending: Default::default(), + epoch: 0, _dropped, } } @@ -212,27 +142,10 @@ impl Subscriber { let notify = { let state = self.state.lock(); - // Get our adjusted index, which could be negative if we've removed more broadcasts than read. - let mut index = self.index.saturating_sub(state.pruned); - - // Push all new segments into a priority queue. - while index < state.lookup.len() { - let (_, segment) = state.lookup.get_index(index).unwrap(); - - // Skip None values (expired segments). - // TODO These might actually be expired, so we should check the expiration time. - if let Some(segment) = segment { - self.pending.push(SegmentPriority(segment.clone())); - } - - index += 1; - } - - self.index = state.pruned + index; - - // Return the higher priority segment. - if let Some(segment) = self.pending.pop() { - return Ok(Some(segment.0)); + if self.epoch != state.epoch { + let segment = state.current.as_ref().unwrap().clone(); + self.epoch = state.epoch; + return Ok(Some(segment)); } // Otherwise check if we need to return an error. @@ -261,7 +174,7 @@ impl fmt::Debug for Subscriber { f.debug_struct("Subscriber") .field("state", &self.state) .field("info", &self.info) - .field("index", &self.index) + .field("epoch", &self.epoch) .finish() } } @@ -282,56 +195,3 @@ impl Drop for Dropped { self.state.lock_mut().close(CacheError::Closed).ok(); } } - -// Used to order segments by expiration time. -struct SegmentExpiration { - sequence: VarInt, - expires: time::Instant, -} - -impl Ord for SegmentExpiration { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // Reverse order so the earliest expiration is at the top of the heap. - other.expires.cmp(&self.expires) - } -} - -impl PartialOrd for SegmentExpiration { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for SegmentExpiration { - fn eq(&self, other: &Self) -> bool { - self.expires == other.expires - } -} - -impl Eq for SegmentExpiration {} - -// Used to order segments by priority -#[derive(Clone)] -struct SegmentPriority(pub segment::Subscriber); - -impl Ord for SegmentPriority { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // Reverse order so the highest priority is at the top of the heap. - // TODO I let CodePilot generate this code so yolo - other.0.priority.cmp(&self.0.priority) - } -} - -impl PartialOrd for SegmentPriority { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for SegmentPriority { - fn eq(&self, other: &Self) -> bool { - self.0.priority == other.0.priority - } -} - -impl Eq for SegmentPriority {} diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index a6fe94e4..957c4547 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -33,6 +33,9 @@ pub enum DecodeError { #[error("invalid subscribe location")] InvalidSubscribeLocation, + #[error("invalid value")] + InvalidValue, + #[error("varint bounds exceeded")] BoundsExceeded(#[from] BoundsExceeded), @@ -48,8 +51,4 @@ pub enum DecodeError { #[error("io error: {0}")] IoError(#[from] std::io::Error), - - // Used to signal that the stream has ended. - #[error("no more messages")] - Final, } diff --git a/moq-transport/src/coding/params.rs b/moq-transport/src/coding/params.rs index 9cfd6f38..1cc85abd 100644 --- a/moq-transport/src/coding/params.rs +++ b/moq-transport/src/coding/params.rs @@ -18,7 +18,7 @@ impl Decode for Params { async fn decode(mut r: &mut R) -> Result { let mut params = HashMap::new(); - // I hate this shit so much; let me encode my role and get on with my life. + // I hate this encoding so much; let me encode my role and get on with my life. let count = VarInt::decode(r).await?; for _ in 0..count.into_inner() { let kind = VarInt::decode(r).await?; diff --git a/moq-transport/src/coding/string.rs b/moq-transport/src/coding/string.rs index 2cdff4a9..1f660d3f 100644 --- a/moq-transport/src/coding/string.rs +++ b/moq-transport/src/coding/string.rs @@ -21,9 +21,9 @@ impl Encode for String { impl Decode for String { /// Decode a string with a varint length prefix. async fn decode(r: &mut R) -> Result { - let size = VarInt::decode(r).await?.into_inner(); - let mut str = String::with_capacity(min(1024, size) as usize); - r.take(size).read_to_string(&mut str).await?; + let size = VarInt::decode(r).await?.into(); + let mut str = String::with_capacity(min(1024, size)); + r.take(size as u64).read_to_string(&mut str).await?; Ok(str) } } diff --git a/moq-transport/src/coding/varint.rs b/moq-transport/src/coding/varint.rs index 8557de8a..95bc8074 100644 --- a/moq-transport/src/coding/varint.rs +++ b/moq-transport/src/coding/varint.rs @@ -18,8 +18,8 @@ pub struct BoundsExceeded; /// An integer less than 2^62 /// /// Values of this type are suitable for encoding as QUIC variable-length integer. -// It would be neat if we could express to Rust that the top two bits are available for use as enum -// discriminants +/// It would be neat if we could express to Rust that the top two bits are available for use as enum +/// discriminants #[derive(Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct VarInt(u64); @@ -83,8 +83,9 @@ impl TryFrom for VarInt { /// Succeeds iff `x` < 2^62 fn try_from(x: u64) -> Result { - if x <= Self::MAX.into_inner() { - Ok(Self(x)) + let x = Self(x); + if x <= Self::MAX { + Ok(x) } else { Err(BoundsExceeded) } @@ -217,7 +218,7 @@ impl Encode for VarInt { } else if x < 2u64.pow(62) { w.write_u64(0b11 << 62 | x).await?; } else { - unreachable!("malformed VarInt"); + return Err(BoundsExceeded.into()); } Ok(()) diff --git a/moq-transport/src/message/announce.rs b/moq-transport/src/control/announce.rs similarity index 71% rename from moq-transport/src/message/announce.rs rename to moq-transport/src/control/announce.rs index 281fffa8..709339d9 100644 --- a/moq-transport/src/message/announce.rs +++ b/moq-transport/src/control/announce.rs @@ -1,7 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; /// Sent by the publisher to announce the availability of a group of tracks. #[derive(Clone, Debug)] @@ -14,14 +13,14 @@ pub struct Announce { } impl Announce { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let namespace = String::decode(r).await?; let params = Params::decode(r).await?; Ok(Self { namespace, params }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.namespace.encode(w).await?; self.params.encode(w).await?; diff --git a/moq-transport/src/message/announce_reset.rs b/moq-transport/src/control/announce_error.rs similarity index 76% rename from moq-transport/src/message/announce_reset.rs rename to moq-transport/src/control/announce_error.rs index 24d3f817..e21886b6 100644 --- a/moq-transport/src/message/announce_reset.rs +++ b/moq-transport/src/control/announce_error.rs @@ -1,7 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; /// Sent by the subscriber to reject an Announce. #[derive(Clone, Debug)] @@ -17,7 +16,7 @@ pub struct AnnounceError { } impl AnnounceError { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let namespace = String::decode(r).await?; let code = VarInt::decode(r).await?.try_into()?; let reason = String::decode(r).await?; @@ -29,7 +28,7 @@ impl AnnounceError { }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.namespace.encode(w).await?; VarInt::from_u32(self.code).encode(w).await?; self.reason.encode(w).await?; diff --git a/moq-transport/src/message/announce_ok.rs b/moq-transport/src/control/announce_ok.rs similarity index 52% rename from moq-transport/src/message/announce_ok.rs rename to moq-transport/src/control/announce_ok.rs index 300279e1..a5c47928 100644 --- a/moq-transport/src/message/announce_ok.rs +++ b/moq-transport/src/control/announce_ok.rs @@ -1,7 +1,4 @@ -use crate::{ - coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError}, - setup::Extensions, -}; +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError}; /// Sent by the subscriber to accept an Announce. #[derive(Clone, Debug)] @@ -12,12 +9,12 @@ pub struct AnnounceOk { } impl AnnounceOk { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let namespace = String::decode(r).await?; Ok(Self { namespace }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.namespace.encode(w).await } } diff --git a/moq-transport/src/message/go_away.rs b/moq-transport/src/control/go_away.rs similarity index 61% rename from moq-transport/src/message/go_away.rs rename to moq-transport/src/control/go_away.rs index 7999c9a9..c86152ae 100644 --- a/moq-transport/src/message/go_away.rs +++ b/moq-transport/src/control/go_away.rs @@ -1,7 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; /// Sent by the server to indicate that the client should connect to a different server. #[derive(Clone, Debug)] @@ -10,12 +9,12 @@ pub struct GoAway { } impl GoAway { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let url = String::decode(r).await?; Ok(Self { url }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.url.encode(w).await } } diff --git a/moq-transport/src/control/message.rs b/moq-transport/src/control/message.rs new file mode 100644 index 00000000..3b3546f3 --- /dev/null +++ b/moq-transport/src/control/message.rs @@ -0,0 +1,102 @@ +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; +use std::fmt; + +use super::{ + Announce, AnnounceError, AnnounceOk, GoAway, Subscribe, SubscribeDone, SubscribeError, SubscribeOk, Unannounce, + Unsubscribe, +}; + +// Use a macro to generate the message types rather than copy-paste. +// This implements a decode/encode method that uses the specified type. +macro_rules! message_types { + {$($name:ident = $val:expr,)*} => { + /// All supported message types. + #[derive(Clone)] + pub enum Message { + $($name($name)),* + } + + impl Message { + pub async fn decode(r: &mut R) -> Result { + let t = VarInt::decode(r).await?; + + match t.into_inner() { + $($val => { + let msg = $name::decode(r).await?; + Ok(Self::$name(msg)) + })* + _ => Err(DecodeError::InvalidMessage(t)), + } + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + $(Self::$name(ref m) => { + VarInt::from_u32($val).encode(w).await?; + m.encode(w).await + },)* + } + } + + pub fn id(&self) -> VarInt { + match self { + $(Self::$name(_) => { + VarInt::from_u32($val) + },)* + } + } + + pub fn name(&self) -> &'static str { + match self { + $(Self::$name(_) => { + stringify!($name) + },)* + } + } + } + + $(impl From<$name> for Message { + fn from(m: $name) -> Self { + Message::$name(m) + } + })* + + impl fmt::Debug for Message { + // Delegate to the message formatter + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + $(Self::$name(ref m) => m.fmt(f),)* + } + } + } + } +} + +// Each message is prefixed with the given VarInt type. +message_types! { + // NOTE: Object and Setup are in other modules. + // Object = 0x0 + // ObjectUnbounded = 0x2 + // SetupClient = 0x40 + // SetupServer = 0x41 + + // SUBSCRIBE family, sent by subscriber + Subscribe = 0x3, + Unsubscribe = 0xa, + + // SUBSCRIBE family, sent by publisher + SubscribeOk = 0x4, + SubscribeError = 0x5, + SubscribeDone = 0xb, + + // ANNOUNCE family, sent by publisher + Announce = 0x6, + Unannounce = 0x9, + + // ANNOUNCE family, sent by subscriber + AnnounceOk = 0x7, + AnnounceError = 0x8, + + // Misc + GoAway = 0x10, +} diff --git a/moq-transport/src/control/mod.rs b/moq-transport/src/control/mod.rs new file mode 100644 index 00000000..200589ea --- /dev/null +++ b/moq-transport/src/control/mod.rs @@ -0,0 +1,56 @@ +//! Low-level message sent over the wire, as defined in the specification. +//! +//! TODO Update this +//! All of these messages are sent over a bidirectional QUIC stream. +//! This introduces some head-of-line blocking but preserves ordering. +//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams. +//! +//! Messages sent by the publisher: +//! - [Announce] +//! - [Unannounce] +//! - [SubscribeOk] +//! - [SubscribeError] +//! - [SubscribeReset] +//! - [Object] +//! +//! Messages sent by the subscriber: +//! - [Subscribe] +//! - [Unsubscribe] +//! - [AnnounceOk] +//! - [AnnounceError] +//! +//! Example flow: +//! ```test +//! -> ANNOUNCE namespace="foo" +//! <- ANNOUNCE_OK namespace="foo" +//! <- SUBSCRIBE id=0 namespace="foo" name="bar" +//! -> SUBSCRIBE_OK id=0 +//! -> OBJECT id=0 sequence=69 priority=4 expires=30 +//! -> OBJECT id=0 sequence=70 priority=4 expires=30 +//! -> OBJECT id=0 sequence=70 priority=4 expires=30 +//! <- SUBSCRIBE_STOP id=0 +//! -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer" +//! ``` +mod announce; +mod announce_error; +mod announce_ok; +mod go_away; +mod message; +mod subscribe; +mod subscribe_done; +mod subscribe_error; +mod subscribe_ok; +mod unannounce; +mod unsubscribe; + +pub use announce::*; +pub use announce_error::*; +pub use announce_ok::*; +pub use go_away::*; +pub use message::*; +pub use subscribe::*; +pub use subscribe_done::*; +pub use subscribe_error::*; +pub use subscribe_ok::*; +pub use unannounce::*; +pub use unsubscribe::*; diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/control/subscribe.rs similarity index 75% rename from moq-transport/src/message/subscribe.rs rename to moq-transport/src/control/subscribe.rs index e64d5a13..0a137312 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/control/subscribe.rs @@ -1,24 +1,18 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, VarInt}; - use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, VarInt}; /// Sent by the subscriber to request all future objects for the given track. /// /// Objects will use the provided ID instead of the full track name, to save bytes. #[derive(Clone, Debug)] pub struct Subscribe { - /// An ID we choose so we can map to the track_name. - // Proposal: https://github.com/moq-wg/moq-transport/issues/209 + /// The subscription ID pub id: VarInt, - /// The track namespace. - /// - /// Must be None if `extensions.subscribe_split` is false. - pub namespace: Option, - - /// The track name. - pub name: String, + /// Track properties + pub track_alias: VarInt, // This alias is useless but part of the spec + pub track_namespace: String, + pub track_name: String, /// The start/end group/object. pub start_group: SubscribeLocation, @@ -31,15 +25,11 @@ pub struct Subscribe { } impl Subscribe { - pub async fn decode(r: &mut R, ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - - let namespace = match ext.subscribe_split { - true => Some(String::decode(r).await?), - false => None, - }; - - let name = String::decode(r).await?; + let track_alias = VarInt::decode(r).await?; + let track_namespace = String::decode(r).await?; + let track_name = String::decode(r).await?; let start_group = SubscribeLocation::decode(r).await?; let start_object = SubscribeLocation::decode(r).await?; @@ -62,8 +52,9 @@ impl Subscribe { Ok(Self { id, - namespace, - name, + track_alias, + track_namespace, + track_name, start_group, start_object, end_group, @@ -72,18 +63,11 @@ impl Subscribe { }) } - pub async fn encode(&self, w: &mut W, ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - - if self.namespace.is_some() != ext.subscribe_split { - panic!("namespace must be None if subscribe_split is false"); - } - - if ext.subscribe_split { - self.namespace.as_ref().unwrap().encode(w).await?; - } - - self.name.encode(w).await?; + self.track_alias.encode(w).await?; + self.track_namespace.encode(w).await?; + self.track_name.encode(w).await?; self.start_group.encode(w).await?; self.start_object.encode(w).await?; diff --git a/moq-transport/src/control/subscribe_done.rs b/moq-transport/src/control/subscribe_done.rs new file mode 100644 index 00000000..7529536a --- /dev/null +++ b/moq-transport/src/control/subscribe_done.rs @@ -0,0 +1,51 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +/// Sent by the publisher to cleanly terminate a Subscribe. +#[derive(Clone, Debug)] +pub struct SubscribeDone { + /// The ID for this subscription. + pub id: VarInt, + + /// The error code + pub code: VarInt, + + /// An optional error reason + pub reason: String, + + /// The final group/object sent on this subscription. + pub last: Option<(VarInt, VarInt)>, +} + +impl SubscribeDone { + pub async fn decode(r: &mut R) -> Result { + let id = VarInt::decode(r).await?; + let code = VarInt::decode(r).await?; + let reason = String::decode(r).await?; + let last = match r.read_u8().await? { + 0 => None, + 1 => Some((VarInt::decode(r).await?, VarInt::decode(r).await?)), + _ => return Err(DecodeError::InvalidValue), + }; + + Ok(Self { id, code, reason, last }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.id.encode(w).await?; + self.code.encode(w).await?; + self.reason.encode(w).await?; + + if let Some((group, object)) = self.last { + w.write_u8(1).await?; + group.encode(w).await?; + object.encode(w).await?; + } else { + w.write_u8(0).await?; + } + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_error.rs b/moq-transport/src/control/subscribe_error.rs similarity index 52% rename from moq-transport/src/message/subscribe_error.rs rename to moq-transport/src/control/subscribe_error.rs index 9ef4c917..fa2f98c1 100644 --- a/moq-transport/src/message/subscribe_error.rs +++ b/moq-transport/src/control/subscribe_error.rs @@ -1,35 +1,42 @@ use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use crate::setup::Extensions; /// Sent by the publisher to reject a Subscribe. #[derive(Clone, Debug)] pub struct SubscribeError { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - // The ID for this subscription. pub id: VarInt, // An error code. - pub code: u32, + pub code: VarInt, // An optional, human-readable reason. pub reason: String, + + /// An optional track alias, only used when error == Retry Track Alias + pub alias: VarInt, } impl SubscribeError { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - let code = VarInt::decode(r).await?.try_into()?; + let code = VarInt::decode(r).await?; let reason = String::decode(r).await?; - - Ok(Self { id, code, reason }) + let alias = VarInt::decode(r).await?; + + Ok(Self { + id, + code, + reason, + alias, + }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - VarInt::from_u32(self.code).encode(w).await?; + self.code.encode(w).await?; self.reason.encode(w).await?; + self.alias.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/control/subscribe_ok.rs b/moq-transport/src/control/subscribe_ok.rs new file mode 100644 index 00000000..2a140c3c --- /dev/null +++ b/moq-transport/src/control/subscribe_ok.rs @@ -0,0 +1,56 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +use crate::coding::{AsyncRead, AsyncWrite}; + +/// Sent by the publisher to accept a Subscribe. +#[derive(Clone, Debug)] +pub struct SubscribeOk { + /// The ID for this subscription. + pub id: VarInt, + + /// The subscription will expire in this many milliseconds. + pub expires: Option, + + /// The latest group and object for the track. + pub latest: Option<(VarInt, VarInt)>, +} + +impl SubscribeOk { + pub async fn decode(r: &mut R) -> Result { + let id = VarInt::decode(r).await?; + let expires = match VarInt::decode(r).await? { + VarInt::ZERO => None, + expires => Some(expires), + }; + + let latest = match r.read_u8().await? { + 0 => None, + 1 => Some((VarInt::decode(r).await?, VarInt::decode(r).await?)), + _ => return Err(DecodeError::InvalidValue), + }; + + Ok(Self { id, expires, latest }) + } +} + +impl SubscribeOk { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.id.encode(w).await?; + self.expires.unwrap_or(VarInt::ZERO).encode(w).await?; + + match self.latest { + Some((group, object)) => { + w.write_u8(1).await?; + group.encode(w).await?; + object.encode(w).await?; + } + None => { + w.write_u8(0).await?; + } + } + + Ok(()) + } +} diff --git a/moq-transport/src/message/unannounce.rs b/moq-transport/src/control/unannounce.rs similarity index 64% rename from moq-transport/src/message/unannounce.rs rename to moq-transport/src/control/unannounce.rs index a2c2e390..e93188c2 100644 --- a/moq-transport/src/message/unannounce.rs +++ b/moq-transport/src/control/unannounce.rs @@ -1,7 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; /// Sent by the publisher to terminate an Announce. #[derive(Clone, Debug)] @@ -11,13 +10,13 @@ pub struct Unannounce { } impl Unannounce { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let namespace = String::decode(r).await?; Ok(Self { namespace }) } - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.namespace.encode(w).await?; Ok(()) diff --git a/moq-transport/src/message/unsubscribe.rs b/moq-transport/src/control/unsubscribe.rs similarity index 55% rename from moq-transport/src/message/unsubscribe.rs rename to moq-transport/src/control/unsubscribe.rs index 5361f594..39abc373 100644 --- a/moq-transport/src/message/unsubscribe.rs +++ b/moq-transport/src/control/unsubscribe.rs @@ -1,26 +1,22 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; /// Sent by the subscriber to terminate a Subscribe. #[derive(Clone, Debug)] pub struct Unsubscribe { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - // The ID for this subscription. pub id: VarInt, } impl Unsubscribe { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { + pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; Ok(Self { id }) } } impl Unsubscribe { - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/data/datagram.rs b/moq-transport/src/data/datagram.rs new file mode 100644 index 00000000..9d849ff5 --- /dev/null +++ b/moq-transport/src/data/datagram.rs @@ -0,0 +1,41 @@ +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +#[derive(Clone, Debug)] +pub struct Datagram { + // The subscribe ID. + pub subscribe_id: VarInt, + + // The track alias. + pub track_alias: VarInt, + + // The sequence number within the track. + pub group_id: VarInt, + + // The object ID within the group. + pub object_id: VarInt, + + // The priority, where **smaller** values are sent first. + pub send_order: VarInt, +} + +impl Datagram { + pub async fn decode(r: &mut R) -> Result { + Ok(Self { + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + group_id: VarInt::decode(r).await?, + object_id: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.group_id.encode(w).await?; + self.object_id.encode(w).await?; + self.send_order.encode(w).await?; + Ok(()) + } +} diff --git a/moq-transport/src/data/group.rs b/moq-transport/src/data/group.rs new file mode 100644 index 00000000..3d2447d4 --- /dev/null +++ b/moq-transport/src/data/group.rs @@ -0,0 +1,58 @@ +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; + +#[derive(Clone, Debug)] +pub struct Group { + // The subscribe ID. + pub subscribe_id: VarInt, + + // The track alias. + pub track_alias: VarInt, + + // The group sequence number + pub group_id: VarInt, + + // The priority, where **smaller** values are sent first. + pub send_order: VarInt, +} + +impl Group { + pub async fn decode(r: &mut R) -> Result { + Ok(Self { + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + group_id: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.group_id.encode(w).await?; + self.send_order.encode(w).await?; + + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct GroupChunk { + pub object_id: VarInt, + pub size: VarInt, +} + +impl GroupChunk { + pub async fn decode(r: &mut R) -> Result { + Ok(Self { + object_id: VarInt::decode(r).await?, + size: VarInt::decode(r).await?, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.object_id.encode(w).await?; + self.size.encode(w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/data/header.rs b/moq-transport/src/data/header.rs new file mode 100644 index 00000000..e3f64d1c --- /dev/null +++ b/moq-transport/src/data/header.rs @@ -0,0 +1,96 @@ +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; +use std::fmt; + +use super::{Datagram, Group, Object, Track}; + +// Use a macro to generate the message types rather than copy-paste. +// This implements a decode/encode method that uses the specified type. +macro_rules! header_types { + {$($name:ident = $val:expr,)*} => { + /// All supported message types. + #[derive(Clone)] + pub enum Header { + $($name($name)),* + } + + impl Header { + pub async fn decode(r: &mut R) -> Result { + let t = VarInt::decode(r).await?; + + match t.into_inner() { + $($val => { + let msg = $name::decode(r).await?; + Ok(Self::$name(msg)) + })* + _ => Err(DecodeError::InvalidMessage(t)), + } + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + $(Self::$name(ref m) => { + VarInt::from_u32($val).encode(w).await?; + m.encode(w).await + },)* + } + } + + pub fn id(&self) -> VarInt { + match self { + $(Self::$name(_) => { + VarInt::from_u32($val) + },)* + } + } + + pub fn name(&self) -> &'static str { + match self { + $(Self::$name(_) => { + stringify!($name) + },)* + } + } + + pub fn subscribe_id(&self) -> VarInt { + match self { + $(Self::$name(o) => o.subscribe_id,)* + } + } + + pub fn track_alias(&self) -> VarInt { + match self { + $(Self::$name(o) => o.track_alias,)* + } + } + + pub fn send_order(&self) -> VarInt { + match self { + $(Self::$name(o) => o.send_order,)* + } + } + } + + $(impl From<$name> for Header { + fn from(m: $name) -> Self { + Self::$name(m) + } + })* + + impl fmt::Debug for Header { + // Delegate to the message formatter + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + $(Self::$name(ref m) => m.fmt(f),)* + } + } + } + } +} + +// Each object type is prefixed with the given VarInt type. +header_types! { + Object = 0x0, + Datagram = 0x1, + Group = 0x50, + Track = 0x51, +} diff --git a/moq-transport/src/data/mod.rs b/moq-transport/src/data/mod.rs new file mode 100644 index 00000000..2090093c --- /dev/null +++ b/moq-transport/src/data/mod.rs @@ -0,0 +1,11 @@ +mod datagram; +mod group; +mod header; +mod object; +mod track; + +pub use datagram::*; +pub use group::*; +pub use header::*; +pub use object::*; +pub use track::*; diff --git a/moq-transport/src/data/object.rs b/moq-transport/src/data/object.rs new file mode 100644 index 00000000..bd0f8540 --- /dev/null +++ b/moq-transport/src/data/object.rs @@ -0,0 +1,42 @@ +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +#[derive(Clone, Debug)] +pub struct Object { + // The subscribe ID. + pub subscribe_id: VarInt, + + // The track alias. + pub track_alias: VarInt, + + // The sequence number within the track. + pub group_id: VarInt, + + // The sequence number within the group. + pub object_id: VarInt, + + // The send order, where **smaller** values are sent first. + pub send_order: VarInt, +} + +impl Object { + pub async fn decode(r: &mut R) -> Result { + Ok(Self { + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + group_id: VarInt::decode(r).await?, + object_id: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.group_id.encode(w).await?; + self.object_id.encode(w).await?; + self.send_order.encode(w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/data/track.rs b/moq-transport/src/data/track.rs new file mode 100644 index 00000000..95dd9ddd --- /dev/null +++ b/moq-transport/src/data/track.rs @@ -0,0 +1,61 @@ +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +#[derive(Clone, Debug)] +pub struct Track { + // The subscribe ID. + pub subscribe_id: VarInt, + + // The track ID. + pub track_alias: VarInt, + + // The priority, where **smaller** values are sent first. + pub send_order: VarInt, +} + +impl Track { + pub async fn decode(r: &mut R) -> Result { + Ok(Self { + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.send_order.encode(w).await?; + + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct TrackChunk { + pub group_id: VarInt, + pub object_id: VarInt, + pub size: VarInt, +} + +impl TrackChunk { + pub async fn decode(r: &mut R) -> Result { + let group_id = VarInt::decode(r).await?; + let object_id = VarInt::decode(r).await?; + let size = VarInt::decode(r).await?; + + Ok(Self { + group_id, + object_id, + size, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.group_id.encode(w).await?; + self.object_id.encode(w).await?; + self.size.encode(w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/error.rs b/moq-transport/src/error.rs index d070251a..66579eed 100644 --- a/moq-transport/src/error.rs +++ b/moq-transport/src/error.rs @@ -1,7 +1,4 @@ -pub trait MoqError { +pub trait MoqError: std::error::Error { /// An integer code that is sent over the wire. fn code(&self) -> u32; - - /// An optional reason sometimes sent over the wire. - fn reason(&self) -> String; } diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 08f4485a..e80deda4 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -10,7 +10,8 @@ mod coding; mod error; pub mod cache; -pub mod message; +pub mod control; +pub mod data; pub mod session; pub mod setup; diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs deleted file mode 100644 index d32a936f..00000000 --- a/moq-transport/src/message/mod.rs +++ /dev/null @@ -1,160 +0,0 @@ -//! Low-level message sent over the wire, as defined in the specification. -//! -//! All of these messages are sent over a bidirectional QUIC stream. -//! This introduces some head-of-line blocking but preserves ordering. -//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams. -//! -//! Messages sent by the publisher: -//! - [Announce] -//! - [Unannounce] -//! - [SubscribeOk] -//! - [SubscribeError] -//! - [SubscribeReset] -//! - [Object] -//! -//! Messages sent by the subscriber: -//! - [Subscribe] -//! - [Unsubscribe] -//! - [AnnounceOk] -//! - [AnnounceError] -//! -//! Example flow: -//! ```test -//! -> ANNOUNCE namespace="foo" -//! <- ANNOUNCE_OK namespace="foo" -//! <- SUBSCRIBE id=0 namespace="foo" name="bar" -//! -> SUBSCRIBE_OK id=0 -//! -> OBJECT id=0 sequence=69 priority=4 expires=30 -//! -> OBJECT id=0 sequence=70 priority=4 expires=30 -//! -> OBJECT id=0 sequence=70 priority=4 expires=30 -//! <- SUBSCRIBE_STOP id=0 -//! -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer" -//! ``` -mod announce; -mod announce_ok; -mod announce_reset; -mod go_away; -mod object; -mod subscribe; -mod subscribe_error; -mod subscribe_fin; -mod subscribe_ok; -mod subscribe_reset; -mod unannounce; -mod unsubscribe; - -pub use announce::*; -pub use announce_ok::*; -pub use announce_reset::*; -pub use go_away::*; -pub use object::*; -pub use subscribe::*; -pub use subscribe_error::*; -pub use subscribe_fin::*; -pub use subscribe_ok::*; -pub use subscribe_reset::*; -pub use unannounce::*; -pub use unsubscribe::*; - -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -use std::fmt; - -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; - -// Use a macro to generate the message types rather than copy-paste. -// This implements a decode/encode method that uses the specified type. -macro_rules! message_types { - {$($name:ident = $val:expr,)*} => { - /// All supported message types. - #[derive(Clone)] - pub enum Message { - $($name($name)),* - } - - impl Message { - pub async fn decode(r: &mut R, ext: &Extensions) -> Result { - let t = VarInt::decode(r).await?; - - match t.into_inner() { - $($val => { - let msg = $name::decode(r, ext).await?; - Ok(Self::$name(msg)) - })* - _ => Err(DecodeError::InvalidMessage(t)), - } - } - - pub async fn encode(&self, w: &mut W, ext: &Extensions) -> Result<(), EncodeError> { - match self { - $(Self::$name(ref m) => { - VarInt::from_u32($val).encode(w).await?; - m.encode(w, ext).await - },)* - } - } - - pub fn id(&self) -> VarInt { - match self { - $(Self::$name(_) => { - VarInt::from_u32($val) - },)* - } - } - - pub fn name(&self) -> &'static str { - match self { - $(Self::$name(_) => { - stringify!($name) - },)* - } - } - } - - $(impl From<$name> for Message { - fn from(m: $name) -> Self { - Message::$name(m) - } - })* - - impl fmt::Debug for Message { - // Delegate to the message formatter - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - $(Self::$name(ref m) => m.fmt(f),)* - } - } - } - } -} - -// Each message is prefixed with the given VarInt type. -message_types! { - // NOTE: Object and Setup are in other modules. - // Object = 0x0 - // ObjectUnbounded = 0x2 - // SetupClient = 0x40 - // SetupServer = 0x41 - - // SUBSCRIBE family, sent by subscriber - Subscribe = 0x3, - Unsubscribe = 0xa, - - // SUBSCRIBE family, sent by publisher - SubscribeOk = 0x4, - SubscribeError = 0x5, - SubscribeFin = 0xb, - SubscribeReset = 0xc, - - // ANNOUNCE family, sent by publisher - Announce = 0x6, - Unannounce = 0x9, - - // ANNOUNCE family, sent by subscriber - AnnounceOk = 0x7, - AnnounceError = 0x8, - - // Misc - GoAway = 0x10, -} diff --git a/moq-transport/src/message/object.rs b/moq-transport/src/message/object.rs deleted file mode 100644 index 90efa232..00000000 --- a/moq-transport/src/message/object.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::{io, time}; - -use tokio::io::AsyncReadExt; - -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use crate::setup; - -/// Sent by the publisher as the header of each data stream. -#[derive(Clone, Debug)] -pub struct Object { - // An ID for this track. - // Proposal: https://github.com/moq-wg/moq-transport/issues/209 - pub track: VarInt, - - // The sequence number within the track. - pub group: VarInt, - - // The sequence number within the group. - pub sequence: VarInt, - - // The priority, where **smaller** values are sent first. - pub priority: u32, - - // Cache the object for at most this many seconds. - // Zero means never expire. - pub expires: Option, - - /// An optional size, allowing multiple OBJECTs on the same stream. - pub size: Option, -} - -impl Object { - pub async fn decode(r: &mut R, extensions: &setup::Extensions) -> Result { - // Try reading the first byte, returning a special error if the stream naturally ended. - let typ = match r.read_u8().await { - Ok(b) => VarInt::decode_byte(b, r).await?, - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Err(DecodeError::Final), - Err(e) => return Err(e.into()), - }; - - let size_present = match typ.into_inner() { - 0 => false, - 2 => true, - _ => return Err(DecodeError::InvalidMessage(typ)), - }; - - let track = VarInt::decode(r).await?; - let group = VarInt::decode(r).await?; - let sequence = VarInt::decode(r).await?; - let priority = VarInt::decode(r).await?.try_into()?; - - let expires = match extensions.object_expires { - true => match VarInt::decode(r).await?.into_inner() { - 0 => None, - secs => Some(time::Duration::from_secs(secs)), - }, - false => None, - }; - - // The presence of the size field depends on the type. - let size = match size_present { - true => Some(VarInt::decode(r).await?), - false => None, - }; - - Ok(Self { - track, - group, - sequence, - priority, - expires, - size, - }) - } - - pub async fn encode(&self, w: &mut W, extensions: &setup::Extensions) -> Result<(), EncodeError> { - // The kind changes based on the presence of the size. - let kind = match self.size { - Some(_) => VarInt::from_u32(2), - None => VarInt::ZERO, - }; - - kind.encode(w).await?; - self.track.encode(w).await?; - self.group.encode(w).await?; - self.sequence.encode(w).await?; - VarInt::from_u32(self.priority).encode(w).await?; - - // Round up if there's any decimal points. - let expires = match self.expires { - None => 0, - Some(time::Duration::ZERO) => return Err(EncodeError::InvalidValue), // there's no way of expressing zero currently. - Some(expires) if expires.subsec_nanos() > 0 => expires.as_secs() + 1, - Some(expires) => expires.as_secs(), - }; - - if extensions.object_expires { - VarInt::try_from(expires)?.encode(w).await?; - } - - if let Some(size) = self.size { - size.encode(w).await?; - } - - Ok(()) - } -} diff --git a/moq-transport/src/message/subscribe_fin.rs b/moq-transport/src/message/subscribe_fin.rs deleted file mode 100644 index b0709714..00000000 --- a/moq-transport/src/message/subscribe_fin.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use crate::setup::Extensions; - -/// Sent by the publisher to cleanly terminate a Subscribe. -#[derive(Clone, Debug)] -pub struct SubscribeFin { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - /// The ID for this subscription. - pub id: VarInt, - - /// The final group/object sent on this subscription. - pub final_group: VarInt, - pub final_object: VarInt, -} - -impl SubscribeFin { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { - let id = VarInt::decode(r).await?; - let final_group = VarInt::decode(r).await?; - let final_object = VarInt::decode(r).await?; - - Ok(Self { - id, - final_group, - final_object, - }) - } - - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { - self.id.encode(w).await?; - self.final_group.encode(w).await?; - self.final_object.encode(w).await?; - - Ok(()) - } -} diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/message/subscribe_ok.rs deleted file mode 100644 index 11864e61..00000000 --- a/moq-transport/src/message/subscribe_ok.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::setup::Extensions; - -/// Sent by the publisher to accept a Subscribe. -#[derive(Clone, Debug)] -pub struct SubscribeOk { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - /// The ID for this track. - pub id: VarInt, - - /// The subscription will expire in this many milliseconds. - pub expires: VarInt, -} - -impl SubscribeOk { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { - let id = VarInt::decode(r).await?; - let expires = VarInt::decode(r).await?; - Ok(Self { id, expires }) - } -} - -impl SubscribeOk { - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { - self.id.encode(w).await?; - self.expires.encode(w).await?; - Ok(()) - } -} diff --git a/moq-transport/src/message/subscribe_reset.rs b/moq-transport/src/message/subscribe_reset.rs deleted file mode 100644 index e488b28e..00000000 --- a/moq-transport/src/message/subscribe_reset.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use crate::setup::Extensions; - -/// Sent by the publisher to terminate a Subscribe. -#[derive(Clone, Debug)] -pub struct SubscribeReset { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - /// The ID for this subscription. - pub id: VarInt, - - /// An error code. - pub code: u32, - - /// An optional, human-readable reason. - pub reason: String, - - /// The final group/object sent on this subscription. - pub final_group: VarInt, - pub final_object: VarInt, -} - -impl SubscribeReset { - pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { - let id = VarInt::decode(r).await?; - let code = VarInt::decode(r).await?.try_into()?; - let reason = String::decode(r).await?; - let final_group = VarInt::decode(r).await?; - let final_object = VarInt::decode(r).await?; - - Ok(Self { - id, - code, - reason, - final_group, - final_object, - }) - } - - pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { - self.id.encode(w).await?; - VarInt::from_u32(self.code).encode(w).await?; - self.reason.encode(w).await?; - - self.final_group.encode(w).await?; - self.final_object.encode(w).await?; - - Ok(()) - } -} diff --git a/moq-transport/src/session/client.rs b/moq-transport/src/session/client.rs index 243fe4ef..fb99d0af 100644 --- a/moq-transport/src/session/client.rs +++ b/moq-transport/src/session/client.rs @@ -30,46 +30,22 @@ impl Client { async fn send_setup(session: &Session, role: setup::Role) -> Result { let mut control = session.open_bi().await?; - let versions: setup::Versions = [setup::Version::DRAFT_01, setup::Version::KIXEL_01].into(); + let versions: setup::Versions = [setup::Version::DRAFT_02].into(); let client = setup::Client { role, versions: versions.clone(), params: Default::default(), - - // Offer all extensions - extensions: setup::Extensions { - object_expires: true, - subscriber_id: true, - subscribe_split: true, - }, }; log::debug!("sending client SETUP: {:?}", client); client.encode(&mut control.0).await?; - let mut server = setup::Server::decode(&mut control.1).await?; + let server = setup::Server::decode(&mut control.1).await?; log::debug!("received server SETUP: {:?}", server); - match server.version { - setup::Version::DRAFT_01 => { - // We always require this extension - server.extensions.require_subscriber_id()?; - - if server.role.is_publisher() { - // We only require object expires if we're a subscriber, so we don't cache objects indefinitely. - server.extensions.require_object_expires()?; - } - } - setup::Version::KIXEL_01 => { - // KIXEL_01 didn't support extensions; all were enabled. - server.extensions = client.extensions.clone() - } - _ => return Err(SessionError::Version(versions, [server.version].into())), - } - - let control = Control::new(control.0, control.1, server.extensions); + let control = Control::new(control.0, control.1); Ok(control) } diff --git a/moq-transport/src/session/control.rs b/moq-transport/src/session/control.rs index 06866509..7cfa78c5 100644 --- a/moq-transport/src/session/control.rs +++ b/moq-transport/src/session/control.rs @@ -6,38 +6,36 @@ use tokio::sync::Mutex; use webtransport_quinn::{RecvStream, SendStream}; use super::SessionError; -use crate::{message::Message, setup::Extensions}; +use crate::control; #[derive(Debug, Clone)] pub(crate) struct Control { send: Arc>, recv: Arc>, - pub ext: Extensions, } impl Control { - pub fn new(send: SendStream, recv: RecvStream, ext: Extensions) -> Self { + pub fn new(send: SendStream, recv: RecvStream) -> Self { Self { send: Arc::new(Mutex::new(send)), recv: Arc::new(Mutex::new(recv)), - ext, } } - pub async fn send + fmt::Debug>(&self, msg: T) -> Result<(), SessionError> { + pub async fn send + fmt::Debug>(&self, msg: T) -> Result<(), SessionError> { let mut stream = self.send.lock().await; log::info!("sending message: {:?}", msg); msg.into() - .encode(&mut *stream, &self.ext) + .encode(&mut *stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(()) } // It's likely a mistake to call this from two different tasks, but it's easier to just support it. - pub async fn recv(&self) -> Result { + pub async fn recv(&self) -> Result { let mut stream = self.recv.lock().await; - let msg = Message::decode(&mut *stream, &self.ext) + let msg = control::Message::decode(&mut *stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(msg) diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index 228a4c87..9f648076 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -52,6 +52,13 @@ pub enum SessionError { #[error("varint bounds exceeded")] BoundsExceeded(#[from] coding::BoundsExceeded), + /// Sequence numbers were received out of order + #[error("sequence numbers out of order: expected={0} actual={1}")] + OutOfOrder(VarInt, VarInt), + + #[error("message was used in an invalid context")] + InvalidMessage, + /// An unclassified error because I'm lazy. TODO classify these errors #[error("unknown error: {0}")] Unknown(String), @@ -76,32 +83,8 @@ impl MoqError for SessionError { Self::InvalidSize(_) => 400, Self::RequiredExtension(_) => 426, Self::BoundsExceeded(_) => 500, - } - } - - /// A reason that is sent over the wire. - fn reason(&self) -> String { - match self { - Self::Cache(err) => err.reason(), - Self::RoleViolation(kind) => format!("role violation for message type {:?}", kind), - Self::RoleIncompatible(client, server) => { - format!( - "role incompatible: client wanted {:?} but server wanted {:?}", - client, server - ) - } - Self::Read(err) => format!("read error: {}", err), - Self::Write(err) => format!("write error: {}", err), - Self::Session(err) => format!("session error: {}", err), - Self::Unknown(err) => format!("unknown error: {}", err), - Self::Version(client, server) => format!("unsupported versions: client={:?} server={:?}", client, server), - Self::Encode(err) => format!("encode error: {}", err), - Self::Decode(err) => format!("decode error: {}", err), - Self::StreamMapping => "streaming mapping conflict".to_owned(), - Self::InvalidPriority(priority) => format!("invalid priority: {}", priority), - Self::InvalidSize(size) => format!("invalid size: {}", size), - Self::RequiredExtension(id) => format!("required extension was missing: {:?}", id), - Self::BoundsExceeded(_) => "varint bounds exceeded".to_string(), + Self::InvalidMessage => 400, + Self::OutOfOrder(_, _) => 400, } } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index cf19bd36..d495d7d7 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -8,9 +8,9 @@ use webtransport_quinn::Session; use crate::{ cache::{broadcast, segment, track, CacheError}, - message, - message::Message, - MoqError, VarInt, + control, + control::Message, + data, MoqError, VarInt, }; use super::{Control, SessionError}; @@ -73,7 +73,7 @@ impl Publisher { }, // No more broadcasts are available. err = self.source.closed() => { - self.webtransport.close(err.code(), err.reason().as_bytes()); + self.webtransport.close(err.code(), err.to_string().as_bytes()); return Ok(()); }, } @@ -90,17 +90,17 @@ impl Publisher { } } - async fn recv_announce_ok(&mut self, _msg: &message::AnnounceOk) -> Result<(), SessionError> { + async fn recv_announce_ok(&mut self, _msg: &control::AnnounceOk) -> Result<(), SessionError> { // We didn't send an announce. Err(CacheError::NotFound.into()) } - async fn recv_announce_error(&mut self, _msg: &message::AnnounceError) -> Result<(), SessionError> { + async fn recv_announce_error(&mut self, _msg: &control::AnnounceError) -> Result<(), SessionError> { // We didn't send an announce. Err(CacheError::NotFound.into()) } - async fn recv_subscribe(&mut self, msg: &message::Subscribe) -> Result<(), SessionError> { + async fn recv_subscribe(&mut self, msg: &control::Subscribe) -> Result<(), SessionError> { // Assume that the subscribe ID is unique for now. let abort = match self.start_subscribe(msg.clone()) { Ok(abort) => abort, @@ -114,36 +114,31 @@ impl Publisher { }; self.control - .send(message::SubscribeOk { + .send(control::SubscribeOk { id: msg.id, - expires: VarInt::ZERO, + expires: None, + + // TODO implement this + latest: None, }) .await } async fn reset_subscribe(&mut self, id: VarInt, err: E) -> Result<(), SessionError> { - let msg = message::SubscribeReset { + let msg = control::SubscribeDone { id, - code: err.code(), - reason: err.reason(), + code: err.code().into(), + reason: err.to_string(), - // TODO properly populate these - // But first: https://github.com/moq-wg/moq-transport/issues/313 - final_group: VarInt::ZERO, - final_object: VarInt::ZERO, + // TODO properly populate this + last: None, }; self.control.send(msg).await } - fn start_subscribe(&mut self, msg: message::Subscribe) -> Result { - // We currently don't use the namespace field in SUBSCRIBE - // Make sure the namespace is empty if it's provided. - if msg.namespace.as_ref().map_or(false, |namespace| !namespace.is_empty()) { - return Err(CacheError::NotFound.into()); - } - - let mut track = self.source.get_track(&msg.name)?; + fn start_subscribe(&mut self, msg: control::Subscribe) -> Result { + let mut track = self.source.get_track(&msg.track_name)?; // TODO only clone the fields we need let mut this = self.clone(); @@ -153,7 +148,7 @@ impl Publisher { let res = this.run_subscribe(msg.id, &mut track).await; if let Err(err) = &res { - log::warn!("failed to serve track: name={} err={:#?}", track.name, err); + log::warn!("failed to serve track: name={} err={}", track.name, err); } // Make sure we send a reset at the end. @@ -185,45 +180,50 @@ impl Publisher { } async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> { - log::trace!("serving group: {:?}", segment); + let header = data::Group { + subscribe_id: id, + track_alias: id, + // Properties of the segment + group_id: segment.sequence, + send_order: VarInt::from_u32(segment.priority), + }; + + log::trace!("sending stream: {:?}", header); let mut stream = self.webtransport.open_uni().await?; // Convert the u32 to a i32, since the Quinn set_priority is signed. let priority = (segment.priority as i64 - i32::MAX as i64) as i32; stream.set_priority(priority).ok(); - while let Some(mut fragment) = segment.fragment().await? { - log::trace!("serving fragment: {:?}", fragment); - - let object = message::Object { - track: id, - - // Properties of the segment - group: segment.sequence, - priority: segment.priority, - expires: segment.expires, + Into::::into(header) + .encode(&mut stream) + .await + .map_err(|e| SessionError::Unknown(e.to_string()))?; - // Properties of the fragment - sequence: fragment.sequence, - size: fragment.size.map(VarInt::try_from).transpose()?, + while let Some(mut fragment) = segment.fragment().await? { + let object = data::GroupChunk { + object_id: fragment.sequence, + size: VarInt::try_from(fragment.size)?, }; + log::trace!("sending chunk: {:?}", object); + object - .encode(&mut stream, &self.control.ext) + .encode(&mut stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; - while let Some(chunk) = fragment.chunk().await? { - //log::trace!("writing chunk: {:?}", chunk); - stream.write_all(&chunk).await?; + while let Some(data) = fragment.chunk().await? { + stream.write_all(&data).await?; + log::trace!("wrote data: len={}", data.len()); } } Ok(()) } - async fn recv_unsubscribe(&mut self, msg: &message::Unsubscribe) -> Result<(), SessionError> { + async fn recv_unsubscribe(&mut self, msg: &control::Unsubscribe) -> Result<(), SessionError> { let abort = self .subscribes .lock() diff --git a/moq-transport/src/session/server.rs b/moq-transport/src/session/server.rs index 215fe948..3908f34a 100644 --- a/moq-transport/src/session/server.rs +++ b/moq-transport/src/session/server.rs @@ -13,32 +13,14 @@ impl Server { pub async fn accept(session: Session) -> Result { let mut control = session.accept_bi().await?; - let mut client = setup::Client::decode(&mut control.1).await?; + let client = setup::Client::decode(&mut control.1).await?; log::debug!("received client SETUP: {:?}", client); - if client.versions.contains(&setup::Version::DRAFT_01) { - // We always require subscriber ID. - client.extensions.require_subscriber_id()?; - - // We require OBJECT_EXPIRES for publishers only. - if client.role.is_publisher() { - client.extensions.require_object_expires()?; - } - - // We don't require SUBSCRIBE_SPLIT since it's easy enough to support, but it's clearly an oversight. - // client.extensions.require(&Extension::SUBSCRIBE_SPLIT)?; - } else if client.versions.contains(&setup::Version::KIXEL_01) { - // Extensions didn't exist in KIXEL_01, so we set them manually. - client.extensions = setup::Extensions { - object_expires: true, - subscriber_id: true, - subscribe_split: true, - }; - } else { + if !client.versions.contains(&setup::Version::DRAFT_02) { return Err(SessionError::Version( client.versions, - [setup::Version::DRAFT_01, setup::Version::KIXEL_01].into(), + [setup::Version::DRAFT_02].into(), )); } @@ -63,7 +45,7 @@ impl Request { let setup = self.setup(setup::Role::Publisher)?; setup.encode(&mut self.control.0).await?; - let control = Control::new(self.control.0, self.control.1, setup.extensions); + let control = Control::new(self.control.0, self.control.1); let publisher = Publisher::new(self.session, control, source); Ok(publisher) } @@ -73,7 +55,7 @@ impl Request { let setup = self.setup(setup::Role::Subscriber)?; setup.encode(&mut self.control.0).await?; - let control = Control::new(self.control.0, self.control.1, setup.extensions); + let control = Control::new(self.control.0, self.control.1); let subscriber = Subscriber::new(self.session, control, source); Ok(subscriber) } @@ -88,8 +70,7 @@ impl Request { fn setup(&mut self, role: setup::Role) -> Result { let server = setup::Server { role, - version: setup::Version::DRAFT_01, - extensions: self.client.extensions.clone(), + version: setup::Version::DRAFT_02, params: Default::default(), }; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 02b5fbd4..00b9f799 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -8,8 +8,9 @@ use std::{ use crate::{ cache::{broadcast, segment, track, CacheError}, coding::DecodeError, - message, - message::Message, + control, + control::Message, + data, session::{Control, SessionError}, VarInt, }; @@ -74,9 +75,8 @@ impl Subscriber { Message::Announce(_) => Ok(()), // don't care Message::Unannounce(_) => Ok(()), // also don't care Message::SubscribeOk(_msg) => Ok(()), // don't care - Message::SubscribeReset(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)), - Message::SubscribeFin(msg) => self.recv_subscribe_error(msg.id, CacheError::Closed), - Message::SubscribeError(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)), + Message::SubscribeDone(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code.try_into()?)), + Message::SubscribeError(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code.try_into()?)), Message::GoAway(_msg) => unimplemented!("GOAWAY"), _ => Err(SessionError::RoleViolation(msg.id())), } @@ -98,7 +98,7 @@ impl Subscriber { tokio::spawn(async move { if let Err(err) = this.run_stream(stream).await { - log::warn!("failed to receive stream: err={:#?}", err); + log::warn!("failed to receive stream: err={}", err); } }); } @@ -106,101 +106,143 @@ impl Subscriber { async fn run_stream(self, mut stream: RecvStream) -> Result<(), SessionError> { // Decode the object on the data stream. - let mut object = message::Object::decode(&mut stream, &self.control.ext) + let header = data::Header::decode(&mut stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; - log::trace!("first object: {:?}", object); + log::trace!("receiving stream: {:?}", header); - // A new scope is needed because the async compiler is dumb + match header { + data::Header::Track(header) => self.run_track(header, stream).await, + data::Header::Group(header) => self.run_group(header, stream).await, + data::Header::Object(header) => self.run_object(header, stream).await, + data::Header::Datagram(_) => Err(SessionError::InvalidMessage), + } + } + + async fn run_track(self, header: data::Track, mut stream: RecvStream) -> Result<(), SessionError> { + loop { + let chunk = match data::TrackChunk::decode(&mut stream).await { + Ok(next) => next, + + // TODO Figure out a way to check for stream FIN instead + Err(DecodeError::UnexpectedEnd) => break, + + // Unknown error + Err(err) => return Err(err.into()), + }; + + log::trace!("receiving chunk: {:?}", chunk); + + // TODO error if we get a duplicate group + let mut segment = { + let mut subscribes = self.subscribes.lock().unwrap(); + let track = subscribes.get_mut(&header.subscribe_id).ok_or(CacheError::NotFound)?; + + track.create_segment(segment::Info { + sequence: chunk.group_id, + priority: header.send_order.try_into()?, // TODO support u64 priorities + })? + }; + + let mut remain = chunk.size.into(); + + // Create a new obvject. + let mut fragment = segment.fragment(remain)?; + + while remain > 0 { + let data = stream + .read_chunk(remain, true) + .await? + .ok_or(DecodeError::UnexpectedEnd)? + .bytes; + + log::trace!("read data: len={}", data.len()); + remain -= data.len(); + fragment.chunk(data)?; + } + } + + Ok(()) + } + + async fn run_group(self, header: data::Group, mut stream: RecvStream) -> Result<(), SessionError> { let mut segment = { let mut subscribes = self.subscribes.lock().unwrap(); - let track = subscribes.get_mut(&object.track).ok_or(CacheError::NotFound)?; + let track = subscribes.get_mut(&header.subscribe_id).ok_or(CacheError::NotFound)?; track.create_segment(segment::Info { - sequence: object.group, - priority: object.priority, - expires: object.expires, + sequence: header.group_id, + priority: header.send_order.try_into()?, // TODO support u64 priorities })? }; - log::trace!("received segment: {:?}", segment); - - // Create the first fragment - let mut fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?; - let mut remain = object.size.map(usize::from); + // Sanity check to make sure we receive the group in order + let mut expected = 0; loop { - if let Some(0) = remain { - // Decode the next object from the stream. - let next = match message::Object::decode(&mut stream, &self.control.ext).await { - Ok(next) => next, - - // No more objects - Err(DecodeError::Final) => break, - - // Unknown error - Err(err) => return Err(err.into()), - }; + let chunk = match data::GroupChunk::decode(&mut stream).await { + Ok(chunk) => chunk, - log::trace!("next object: {:?}", object); + // TODO Figure out a way to check for stream FIN instead + Err(DecodeError::UnexpectedEnd) => break, - // NOTE: This is a custom restriction; not part of the moq-transport draft. - // We require every OBJECT to contain the same priority since prioritization is done per-stream. - // We also require every OBJECT to contain the same group so we know when the group ends, and can detect gaps. - if next.priority != object.priority && next.group != object.group { - return Err(SessionError::StreamMapping); - } - - object = next; + // Unknown error + Err(err) => return Err(err.into()), + }; - // Create a new object. - fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?; - remain = object.size.map(usize::from); + log::trace!("receiving chunk: {:?}", chunk); - log::trace!("next fragment: {:?}", fragment); + // NOTE: We allow gaps, but not going backwards. + if chunk.object_id.into_inner() < expected { + return Err(SessionError::OutOfOrder(expected.try_into()?, chunk.object_id)); } - match stream.read_chunk(remain.unwrap_or(usize::MAX), true).await? { - // Unbounded object has ended - None if remain.is_none() => break, + expected = chunk.object_id.into_inner() + 1; - // Bounded object ended early, oops. - None => return Err(DecodeError::UnexpectedEnd.into()), + let mut remain = chunk.size.into(); - // NOTE: This does not make a copy! - // Bytes are immutable and ref counted. - Some(data) => { - remain = remain.map(|r| r - data.bytes.len()); + // Create a new obvject. + let mut fragment = segment.fragment(remain)?; - log::trace!("next chunk: {:?}", data); - fragment.chunk(data.bytes)?; - } + while remain > 0 { + let data = stream + .read_chunk(remain, true) + .await? + .ok_or(DecodeError::UnexpectedEnd)?; + remain -= data.bytes.len(); + fragment.chunk(data.bytes)?; } } Ok(()) } + async fn run_object(self, _header: data::Object, _stream: RecvStream) -> Result<(), SessionError> { + unimplemented!("TODO"); + } + async fn run_source(mut self) -> Result<(), SessionError> { loop { // NOTE: This returns Closed when the source is closed. let track = self.source.next_track().await?; - let name = track.name.clone(); + let track_name = track.name.clone(); let id = VarInt::from_u32(self.next.fetch_add(1, atomic::Ordering::SeqCst)); self.subscribes.lock().unwrap().insert(id, track); - let msg = message::Subscribe { + let msg = control::Subscribe { id, - namespace: self.control.ext.subscribe_split.then(|| "".to_string()), - name, + + track_alias: id, // This alias is useless but part of the spec. + track_namespace: "".to_string(), + track_name, // TODO correctly support these - start_group: message::SubscribeLocation::Latest(VarInt::ZERO), - start_object: message::SubscribeLocation::Absolute(VarInt::ZERO), - end_group: message::SubscribeLocation::None, - end_object: message::SubscribeLocation::None, + start_group: control::SubscribeLocation::Latest(VarInt::ZERO), + start_object: control::SubscribeLocation::Absolute(VarInt::ZERO), + end_group: control::SubscribeLocation::None, + end_object: control::SubscribeLocation::None, params: Default::default(), }; diff --git a/moq-transport/src/setup/client.rs b/moq-transport/src/setup/client.rs index a18eb7de..47782913 100644 --- a/moq-transport/src/setup/client.rs +++ b/moq-transport/src/setup/client.rs @@ -1,4 +1,4 @@ -use super::{Extensions, Role, Versions}; +use super::{Role, Versions}; use crate::{ coding::{Decode, DecodeError, Encode, EncodeError, Params}, VarInt, @@ -17,9 +17,6 @@ pub struct Client { /// Indicate if the client is a publisher, a subscriber, or both. pub role: Role, - /// A list of known/offered extensions. - pub extensions: Extensions, - /// Unknown parameters. pub params: Params, } @@ -46,14 +43,7 @@ impl Client { return Err(DecodeError::InvalidParameter); } - let extensions = Extensions::load(&mut params).await?; - - Ok(Self { - versions, - role, - extensions, - params, - }) + Ok(Self { versions, role, params }) } /// Encode a server setup message. @@ -63,7 +53,6 @@ impl Client { let mut params = self.params.clone(); params.set(VarInt::from_u32(0), self.role).await?; - self.extensions.store(&mut params).await?; params.encode(w).await?; diff --git a/moq-transport/src/setup/extension.rs b/moq-transport/src/setup/extension.rs deleted file mode 100644 index 9e8c8ccd..00000000 --- a/moq-transport/src/setup/extension.rs +++ /dev/null @@ -1,84 +0,0 @@ -use tokio::io::{AsyncRead, AsyncWrite}; - -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; -use crate::session::SessionError; -use crate::VarInt; -use paste::paste; - -/// This is a custom extension scheme to allow/require draft PRs. -/// -/// By convention, the extension number is the PR number + 0xe0000. - -macro_rules! extensions { - {$($name:ident = $val:expr,)*} => { - #[derive(Clone, Default, Debug)] - pub struct Extensions { - $( - pub $name: bool, - )* - } - - impl Extensions { - pub async fn load(params: &mut Params) -> Result { - let mut extensions = Self::default(); - - $( - if let Some(_) = params.get::(VarInt::from_u32($val)).await? { - extensions.$name = true - } - )* - - Ok(extensions) - } - - pub async fn store(&self, params: &mut Params) -> Result<(), EncodeError> { - $( - if self.$name { - params.set(VarInt::from_u32($val), ExtensionExists{}).await?; - } - )* - - Ok(()) - } - - paste! { - $( - pub fn [](&self) -> Result<(), SessionError> { - match self.$name { - true => Ok(()), - false => Err(SessionError::RequiredExtension(VarInt::from_u32($val))), - } - } - )* - } - } - } -} - -struct ExtensionExists; - -#[async_trait::async_trait] -impl Decode for ExtensionExists { - async fn decode(_r: &mut R) -> Result { - Ok(ExtensionExists {}) - } -} - -#[async_trait::async_trait] -impl Encode for ExtensionExists { - async fn encode(&self, _w: &mut W) -> Result<(), EncodeError> { - Ok(()) - } -} - -extensions! { - // required for publishers: OBJECT contains expires VarInt in seconds: https://github.com/moq-wg/moq-transport/issues/249 - // TODO write up a PR - object_expires = 0xe00f9, - - // required: SUBSCRIBE chooses track ID: https://github.com/moq-wg/moq-transport/pull/258 - subscriber_id = 0xe0102, - - // optional: SUBSCRIBE contains namespace/name tuple: https://github.com/moq-wg/moq-transport/pull/277 - subscribe_split = 0xe0115, -} diff --git a/moq-transport/src/setup/mod.rs b/moq-transport/src/setup/mod.rs index e7662e71..e5c59c84 100644 --- a/moq-transport/src/setup/mod.rs +++ b/moq-transport/src/setup/mod.rs @@ -5,13 +5,11 @@ //! Both sides negotate the [Version] and [Role]. mod client; -mod extension; mod role; mod server; mod version; pub use client::*; -pub use extension::*; pub use role::*; pub use server::*; pub use version::*; diff --git a/moq-transport/src/setup/server.rs b/moq-transport/src/setup/server.rs index 7f731195..aca99759 100644 --- a/moq-transport/src/setup/server.rs +++ b/moq-transport/src/setup/server.rs @@ -1,4 +1,4 @@ -use super::{Extensions, Role, Version}; +use super::{Role, Version}; use crate::{ coding::{Decode, DecodeError, Encode, EncodeError, Params}, VarInt, @@ -18,9 +18,6 @@ pub struct Server { // Proposal: moq-wg/moq-transport#151 pub role: Role, - /// Custom extensions. - pub extensions: Extensions, - /// Unknown parameters. pub params: Params, } @@ -46,14 +43,7 @@ impl Server { return Err(DecodeError::InvalidParameter); } - let extensions = Extensions::load(&mut params).await?; - - Ok(Self { - version, - role, - extensions, - params, - }) + Ok(Self { version, role, params }) } /// Encode the server setup. @@ -63,7 +53,6 @@ impl Server { let mut params = self.params.clone(); params.set(VarInt::from_u32(0), self.role).await?; - self.extensions.store(&mut params).await?; params.encode(w).await?; Ok(()) diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/setup/version.rs index 89330391..03460d3f 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/setup/version.rs @@ -15,62 +15,11 @@ impl Version { /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html pub const DRAFT_01: Version = Version(VarInt::from_u32(0xff000001)); - /// Fork of draft-ietf-moq-transport-00. - /// - /// Rough list of differences: - /// - /// # Messages - /// - Messages are sent over a control stream or a data stream. - /// - Data streams: each unidirectional stream contains a single OBJECT message. - /// - Control stream: a (client-initiated) bidirectional stream containing SETUP and then all other messages. - /// - Messages do not contain a length; unknown messages are fatal. - /// - /// # SETUP - /// - SETUP is split into SETUP_CLIENT and SETUP_SERVER with separate IDs. - /// - SETUP uses version `0xff00` for draft-00. - /// - SETUP no longer contains optional parameters; all are encoded in order and possibly zero. - /// - SETUP `role` indicates the role of the sender, not the role of the server. - /// - SETUP `path` field removed; use WebTransport for path. - /// - /// # SUBSCRIBE - /// - SUBSCRIBE `full_name` is split into separate `namespace` and `name` fields. - /// - SUBSCRIBE no longer contains optional parameters; all are encoded in order and possibly zero. - /// - SUBSCRIBE no longer contains the `auth` parameter; use WebTransport for auth. - /// - SUBSCRIBE no longer contains the `group` parameter; concept no longer exists. - /// - SUBSCRIBE contains the `id` instead of SUBSCRIBE_OK. - /// - SUBSCRIBE_OK and SUBSCRIBE_ERROR reference the subscription `id` the instead of the track `full_name`. - /// - SUBSCRIBE_ERROR was renamed to SUBSCRIBE_RESET, sent by publisher to terminate a SUBSCRIBE. - /// - SUBSCRIBE_STOP was added, sent by the subscriber to terminate a SUBSCRIBE. - /// - SUBSCRIBE_OK no longer has `expires`. - /// - /// # ANNOUNCE - /// - ANNOUNCE no longer contains optional parameters; all are encoded in order and possibly zero. - /// - ANNOUNCE no longer contains the `auth` field; use WebTransport for auth. - /// - ANNOUNCE_ERROR was renamed to ANNOUNCE_RESET, sent by publisher to terminate an ANNOUNCE. - /// - ANNOUNCE_STOP was added, sent by the subscriber to terminate an ANNOUNCE. - /// - /// # OBJECT - /// - OBJECT uses a dedicated QUIC stream. - /// - OBJECT has no size and continues until stream FIN. - /// - OBJECT `priority` is a i32 instead of a varint. (for practical reasons) - /// - OBJECT `expires` was added, a varint in seconds. - /// - OBJECT `group` was removed. - /// - /// # GROUP - /// - GROUP concept was removed, replaced with OBJECT as a QUIC stream. - pub const KIXEL_00: Version = Version(VarInt::from_u32(0xbad00)); - - /// Fork of draft-ietf-moq-transport-01. - /// - /// Most of the KIXEL_00 changes made it into the draft, or were reverted. - /// This was only used for a short time until extensions were created. - /// - /// - SUBSCRIBE contains a separate track namespace and track name field (accidental revert). [#277](https://github.com/moq-wg/moq-transport/pull/277) - /// - SUBSCRIBE contains the `track_id` instead of SUBSCRIBE_OK. [#145](https://github.com/moq-wg/moq-transport/issues/145) - /// - SUBSCRIBE_* reference `track_id` the instead of the `track_full_name`. [#145](https://github.com/moq-wg/moq-transport/issues/145) - /// - OBJECT `priority` is still a VarInt, but the max value is a u32 (implementation reasons) - /// - OBJECT messages within the same `group` MUST be on the same QUIC stream. - pub const KIXEL_01: Version = Version(VarInt::from_u32(0xbad01)); + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-02.html + pub const DRAFT_02: Version = Version(VarInt::from_u32(0xff000002)); + + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-03.html + pub const DRAFT_03: Version = Version(VarInt::from_u32(0xff000003)); } impl From for Version {