From adecfc93e889531a8e17e8dd1ee922a2c00e0887 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 8 Mar 2024 14:25:16 -0800 Subject: [PATCH] draft-03 support? --- moq-transport/src/cache/error.rs | 2 +- moq-transport/src/coding/decode.rs | 3 + .../src/{message => control}/announce.rs | 0 .../announce_error.rs} | 0 .../src/{message => control}/announce_ok.rs | 0 .../src/{message => control}/go_away.rs | 0 .../{message/mod.rs => control/message.rs} | 64 ++----------------- moq-transport/src/control/mod.rs | 56 ++++++++++++++++ .../src/{message => control}/subscribe.rs | 0 moq-transport/src/control/subscribe_done.rs | 51 +++++++++++++++ .../{message => control}/subscribe_error.rs | 6 +- .../src/{message => control}/subscribe_ok.rs | 26 +++++++- .../src/{message => control}/unannounce.rs | 0 .../src/{message => control}/unsubscribe.rs | 0 moq-transport/src/data/datagram.rs | 41 ++++++++++++ moq-transport/src/data/group.rs | 41 ++++++------ moq-transport/src/data/header.rs | 15 +++-- moq-transport/src/data/mod.rs | 2 + moq-transport/src/data/object.rs | 38 +++++------ moq-transport/src/data/track.rs | 40 ++++++------ moq-transport/src/lib.rs | 2 +- moq-transport/src/message/subscribe_fin.rs | 35 ---------- moq-transport/src/message/subscribe_reset.rs | 48 -------------- moq-transport/src/session/control.rs | 8 +-- moq-transport/src/session/error.rs | 4 ++ moq-transport/src/session/publisher.rs | 41 ++++++------ moq-transport/src/session/subscriber.rs | 39 +++++------ moq-transport/src/setup/version.rs | 60 +---------------- 28 files changed, 303 insertions(+), 319 deletions(-) rename moq-transport/src/{message => control}/announce.rs (100%) rename moq-transport/src/{message/announce_reset.rs => control/announce_error.rs} (100%) rename moq-transport/src/{message => control}/announce_ok.rs (100%) rename moq-transport/src/{message => control}/go_away.rs (100%) rename moq-transport/src/{message/mod.rs => control/message.rs} (57%) create mode 100644 moq-transport/src/control/mod.rs rename moq-transport/src/{message => control}/subscribe.rs (100%) create mode 100644 moq-transport/src/control/subscribe_done.rs rename moq-transport/src/{message => control}/subscribe_error.rs (88%) rename moq-transport/src/{message => control}/subscribe_ok.rs (60%) rename moq-transport/src/{message => control}/unannounce.rs (100%) rename moq-transport/src/{message => control}/unsubscribe.rs (100%) create mode 100644 moq-transport/src/data/datagram.rs delete mode 100644 moq-transport/src/message/subscribe_fin.rs delete mode 100644 moq-transport/src/message/subscribe_reset.rs diff --git a/moq-transport/src/cache/error.rs b/moq-transport/src/cache/error.rs index 7cb4c38b..dfe84f6e 100644 --- a/moq-transport/src/cache/error.rs +++ b/moq-transport/src/cache/error.rs @@ -9,7 +9,7 @@ pub enum CacheError { #[error("closed")] Closed, - /// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher. + /// A SUBSCRIBE_DONE or ANNOUNCE_CANCEL was received. #[error("reset code={0:?}")] Reset(u32), diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index fe92d0d9..957c4547 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -33,6 +33,9 @@ pub enum DecodeError { #[error("invalid subscribe location")] InvalidSubscribeLocation, + #[error("invalid value")] + InvalidValue, + #[error("varint bounds exceeded")] BoundsExceeded(#[from] BoundsExceeded), diff --git a/moq-transport/src/message/announce.rs b/moq-transport/src/control/announce.rs similarity index 100% rename from moq-transport/src/message/announce.rs rename to moq-transport/src/control/announce.rs diff --git a/moq-transport/src/message/announce_reset.rs b/moq-transport/src/control/announce_error.rs similarity index 100% rename from moq-transport/src/message/announce_reset.rs rename to moq-transport/src/control/announce_error.rs diff --git a/moq-transport/src/message/announce_ok.rs b/moq-transport/src/control/announce_ok.rs similarity index 100% rename from moq-transport/src/message/announce_ok.rs rename to moq-transport/src/control/announce_ok.rs diff --git a/moq-transport/src/message/go_away.rs b/moq-transport/src/control/go_away.rs similarity index 100% rename from moq-transport/src/message/go_away.rs rename to moq-transport/src/control/go_away.rs diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/control/message.rs similarity index 57% rename from moq-transport/src/message/mod.rs rename to moq-transport/src/control/message.rs index 3865dbe6..3b3546f3 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/control/message.rs @@ -1,62 +1,11 @@ -//! Low-level message sent over the wire, as defined in the specification. -//! -//! All of these messages are sent over a bidirectional QUIC stream. -//! This introduces some head-of-line blocking but preserves ordering. -//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams. -//! -//! Messages sent by the publisher: -//! - [Announce] -//! - [Unannounce] -//! - [SubscribeOk] -//! - [SubscribeError] -//! - [SubscribeReset] -//! - [Object] -//! -//! Messages sent by the subscriber: -//! - [Subscribe] -//! - [Unsubscribe] -//! - [AnnounceOk] -//! - [AnnounceError] -//! -//! Example flow: -//! ```test -//! -> ANNOUNCE namespace="foo" -//! <- ANNOUNCE_OK namespace="foo" -//! <- SUBSCRIBE id=0 namespace="foo" name="bar" -//! -> SUBSCRIBE_OK id=0 -//! -> OBJECT id=0 sequence=69 priority=4 expires=30 -//! -> OBJECT id=0 sequence=70 priority=4 expires=30 -//! -> OBJECT id=0 sequence=70 priority=4 expires=30 -//! <- SUBSCRIBE_STOP id=0 -//! -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer" -//! ``` -mod announce; -mod announce_ok; -mod announce_reset; -mod go_away; -mod subscribe; -mod subscribe_error; -mod subscribe_fin; -mod subscribe_ok; -mod subscribe_reset; -mod unannounce; -mod unsubscribe; - -pub use announce::*; -pub use announce_ok::*; -pub use announce_reset::*; -pub use go_away::*; -pub use subscribe::*; -pub use subscribe_error::*; -pub use subscribe_fin::*; -pub use subscribe_ok::*; -pub use subscribe_reset::*; -pub use unannounce::*; -pub use unsubscribe::*; - use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; use std::fmt; +use super::{ + Announce, AnnounceError, AnnounceOk, GoAway, Subscribe, SubscribeDone, SubscribeError, SubscribeOk, Unannounce, + Unsubscribe, +}; + // Use a macro to generate the message types rather than copy-paste. // This implements a decode/encode method that uses the specified type. macro_rules! message_types { @@ -138,8 +87,7 @@ message_types! { // SUBSCRIBE family, sent by publisher SubscribeOk = 0x4, SubscribeError = 0x5, - SubscribeFin = 0xb, - SubscribeReset = 0xc, + SubscribeDone = 0xb, // ANNOUNCE family, sent by publisher Announce = 0x6, diff --git a/moq-transport/src/control/mod.rs b/moq-transport/src/control/mod.rs new file mode 100644 index 00000000..200589ea --- /dev/null +++ b/moq-transport/src/control/mod.rs @@ -0,0 +1,56 @@ +//! Low-level message sent over the wire, as defined in the specification. +//! +//! TODO Update this +//! All of these messages are sent over a bidirectional QUIC stream. +//! This introduces some head-of-line blocking but preserves ordering. +//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams. +//! +//! Messages sent by the publisher: +//! - [Announce] +//! - [Unannounce] +//! - [SubscribeOk] +//! - [SubscribeError] +//! - [SubscribeReset] +//! - [Object] +//! +//! Messages sent by the subscriber: +//! - [Subscribe] +//! - [Unsubscribe] +//! - [AnnounceOk] +//! - [AnnounceError] +//! +//! Example flow: +//! ```test +//! -> ANNOUNCE namespace="foo" +//! <- ANNOUNCE_OK namespace="foo" +//! <- SUBSCRIBE id=0 namespace="foo" name="bar" +//! -> SUBSCRIBE_OK id=0 +//! -> OBJECT id=0 sequence=69 priority=4 expires=30 +//! -> OBJECT id=0 sequence=70 priority=4 expires=30 +//! -> OBJECT id=0 sequence=70 priority=4 expires=30 +//! <- SUBSCRIBE_STOP id=0 +//! -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer" +//! ``` +mod announce; +mod announce_error; +mod announce_ok; +mod go_away; +mod message; +mod subscribe; +mod subscribe_done; +mod subscribe_error; +mod subscribe_ok; +mod unannounce; +mod unsubscribe; + +pub use announce::*; +pub use announce_error::*; +pub use announce_ok::*; +pub use go_away::*; +pub use message::*; +pub use subscribe::*; +pub use subscribe_done::*; +pub use subscribe_error::*; +pub use subscribe_ok::*; +pub use unannounce::*; +pub use unsubscribe::*; diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/control/subscribe.rs similarity index 100% rename from moq-transport/src/message/subscribe.rs rename to moq-transport/src/control/subscribe.rs diff --git a/moq-transport/src/control/subscribe_done.rs b/moq-transport/src/control/subscribe_done.rs new file mode 100644 index 00000000..7529536a --- /dev/null +++ b/moq-transport/src/control/subscribe_done.rs @@ -0,0 +1,51 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +/// Sent by the publisher to cleanly terminate a Subscribe. +#[derive(Clone, Debug)] +pub struct SubscribeDone { + /// The ID for this subscription. + pub id: VarInt, + + /// The error code + pub code: VarInt, + + /// An optional error reason + pub reason: String, + + /// The final group/object sent on this subscription. + pub last: Option<(VarInt, VarInt)>, +} + +impl SubscribeDone { + pub async fn decode(r: &mut R) -> Result { + let id = VarInt::decode(r).await?; + let code = VarInt::decode(r).await?; + let reason = String::decode(r).await?; + let last = match r.read_u8().await? { + 0 => None, + 1 => Some((VarInt::decode(r).await?, VarInt::decode(r).await?)), + _ => return Err(DecodeError::InvalidValue), + }; + + Ok(Self { id, code, reason, last }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.id.encode(w).await?; + self.code.encode(w).await?; + self.reason.encode(w).await?; + + if let Some((group, object)) = self.last { + w.write_u8(1).await?; + group.encode(w).await?; + object.encode(w).await?; + } else { + w.write_u8(0).await?; + } + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_error.rs b/moq-transport/src/control/subscribe_error.rs similarity index 88% rename from moq-transport/src/message/subscribe_error.rs rename to moq-transport/src/control/subscribe_error.rs index 3577ae5c..fa2f98c1 100644 --- a/moq-transport/src/message/subscribe_error.rs +++ b/moq-transport/src/control/subscribe_error.rs @@ -8,7 +8,7 @@ pub struct SubscribeError { pub id: VarInt, // An error code. - pub code: u32, + pub code: VarInt, // An optional, human-readable reason. pub reason: String, @@ -20,7 +20,7 @@ pub struct SubscribeError { impl SubscribeError { pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - let code = VarInt::decode(r).await?.try_into()?; + let code = VarInt::decode(r).await?; let reason = String::decode(r).await?; let alias = VarInt::decode(r).await?; @@ -34,7 +34,7 @@ impl SubscribeError { pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - VarInt::from_u32(self.code).encode(w).await?; + self.code.encode(w).await?; self.reason.encode(w).await?; self.alias.encode(w).await?; diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/control/subscribe_ok.rs similarity index 60% rename from moq-transport/src/message/subscribe_ok.rs rename to moq-transport/src/control/subscribe_ok.rs index aac07a70..2a140c3c 100644 --- a/moq-transport/src/message/subscribe_ok.rs +++ b/moq-transport/src/control/subscribe_ok.rs @@ -1,3 +1,5 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; @@ -10,6 +12,9 @@ pub struct SubscribeOk { /// The subscription will expire in this many milliseconds. pub expires: Option, + + /// The latest group and object for the track. + pub latest: Option<(VarInt, VarInt)>, } impl SubscribeOk { @@ -19,7 +24,14 @@ impl SubscribeOk { VarInt::ZERO => None, expires => Some(expires), }; - Ok(Self { id, expires }) + + let latest = match r.read_u8().await? { + 0 => None, + 1 => Some((VarInt::decode(r).await?, VarInt::decode(r).await?)), + _ => return Err(DecodeError::InvalidValue), + }; + + Ok(Self { id, expires, latest }) } } @@ -27,6 +39,18 @@ impl SubscribeOk { pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; self.expires.unwrap_or(VarInt::ZERO).encode(w).await?; + + match self.latest { + Some((group, object)) => { + w.write_u8(1).await?; + group.encode(w).await?; + object.encode(w).await?; + } + None => { + w.write_u8(0).await?; + } + } + Ok(()) } } diff --git a/moq-transport/src/message/unannounce.rs b/moq-transport/src/control/unannounce.rs similarity index 100% rename from moq-transport/src/message/unannounce.rs rename to moq-transport/src/control/unannounce.rs diff --git a/moq-transport/src/message/unsubscribe.rs b/moq-transport/src/control/unsubscribe.rs similarity index 100% rename from moq-transport/src/message/unsubscribe.rs rename to moq-transport/src/control/unsubscribe.rs diff --git a/moq-transport/src/data/datagram.rs b/moq-transport/src/data/datagram.rs new file mode 100644 index 00000000..9d849ff5 --- /dev/null +++ b/moq-transport/src/data/datagram.rs @@ -0,0 +1,41 @@ +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +#[derive(Clone, Debug)] +pub struct Datagram { + // The subscribe ID. + pub subscribe_id: VarInt, + + // The track alias. + pub track_alias: VarInt, + + // The sequence number within the track. + pub group_id: VarInt, + + // The object ID within the group. + pub object_id: VarInt, + + // The priority, where **smaller** values are sent first. + pub send_order: VarInt, +} + +impl Datagram { + pub async fn decode(r: &mut R) -> Result { + Ok(Self { + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + group_id: VarInt::decode(r).await?, + object_id: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.group_id.encode(w).await?; + self.object_id.encode(w).await?; + self.send_order.encode(w).await?; + Ok(()) + } +} diff --git a/moq-transport/src/data/group.rs b/moq-transport/src/data/group.rs index 5a7ceacd..3d2447d4 100644 --- a/moq-transport/src/data/group.rs +++ b/moq-transport/src/data/group.rs @@ -3,38 +3,33 @@ use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeEr #[derive(Clone, Debug)] pub struct Group { // The subscribe ID. - pub subscribe: VarInt, + pub subscribe_id: VarInt, // The track alias. - pub track: VarInt, + pub track_alias: VarInt, // The group sequence number - pub group: VarInt, + pub group_id: VarInt, // The priority, where **smaller** values are sent first. - pub priority: u32, + pub send_order: VarInt, } impl Group { pub async fn decode(r: &mut R) -> Result { - let subscribe = VarInt::decode(r).await?; - let track = VarInt::decode(r).await?; - let group = VarInt::decode(r).await?; - let priority = VarInt::decode(r).await?.try_into()?; - Ok(Self { - subscribe, - track, - group, - priority, + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + group_id: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.subscribe.encode(w).await?; - self.track.encode(w).await?; - self.group.encode(w).await?; - VarInt::from_u32(self.priority).encode(w).await?; + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.group_id.encode(w).await?; + self.send_order.encode(w).await?; Ok(()) } @@ -42,20 +37,20 @@ impl Group { #[derive(Clone, Debug)] pub struct GroupChunk { - pub object: VarInt, + pub object_id: VarInt, pub size: VarInt, } impl GroupChunk { pub async fn decode(r: &mut R) -> Result { - let object = VarInt::decode(r).await?; - let size = VarInt::decode(r).await?; - - Ok(Self { object, size }) + Ok(Self { + object_id: VarInt::decode(r).await?, + size: VarInt::decode(r).await?, + }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.object.encode(w).await?; + self.object_id.encode(w).await?; self.size.encode(w).await?; Ok(()) diff --git a/moq-transport/src/data/header.rs b/moq-transport/src/data/header.rs index aedfc91d..e3f64d1c 100644 --- a/moq-transport/src/data/header.rs +++ b/moq-transport/src/data/header.rs @@ -1,7 +1,7 @@ use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt}; use std::fmt; -use super::{Group, Object, Track}; +use super::{Datagram, Group, Object, Track}; // Use a macro to generate the message types rather than copy-paste. // This implements a decode/encode method that uses the specified type. @@ -51,21 +51,21 @@ macro_rules! header_types { } } - pub fn subscribe(&self) -> VarInt { + pub fn subscribe_id(&self) -> VarInt { match self { - $(Self::$name(o) => o.subscribe,)* + $(Self::$name(o) => o.subscribe_id,)* } } - pub fn track(&self) -> VarInt { + pub fn track_alias(&self) -> VarInt { match self { - $(Self::$name(o) => o.track,)* + $(Self::$name(o) => o.track_alias,)* } } - pub fn priority(&self) -> u32 { + pub fn send_order(&self) -> VarInt { match self { - $(Self::$name(o) => o.priority,)* + $(Self::$name(o) => o.send_order,)* } } } @@ -90,6 +90,7 @@ macro_rules! header_types { // Each object type is prefixed with the given VarInt type. header_types! { Object = 0x0, + Datagram = 0x1, Group = 0x50, Track = 0x51, } diff --git a/moq-transport/src/data/mod.rs b/moq-transport/src/data/mod.rs index b820bdcf..2090093c 100644 --- a/moq-transport/src/data/mod.rs +++ b/moq-transport/src/data/mod.rs @@ -1,8 +1,10 @@ +mod datagram; mod group; mod header; mod object; mod track; +pub use datagram::*; pub use group::*; pub use header::*; pub use object::*; diff --git a/moq-transport/src/data/object.rs b/moq-transport/src/data/object.rs index f4196a34..bd0f8540 100644 --- a/moq-transport/src/data/object.rs +++ b/moq-transport/src/data/object.rs @@ -4,44 +4,38 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; #[derive(Clone, Debug)] pub struct Object { // The subscribe ID. - pub subscribe: VarInt, + pub subscribe_id: VarInt, // The track alias. - pub track: VarInt, + pub track_alias: VarInt, // The sequence number within the track. - pub group: VarInt, + pub group_id: VarInt, // The sequence number within the group. - pub sequence: VarInt, + pub object_id: VarInt, - // The priority, where **smaller** values are sent first. - pub priority: u32, + // The send order, where **smaller** values are sent first. + pub send_order: VarInt, } impl Object { pub async fn decode(r: &mut R) -> Result { - let subscribe = VarInt::decode(r).await?; - let track = VarInt::decode(r).await?; - let group = VarInt::decode(r).await?; - let sequence = VarInt::decode(r).await?; - let priority = VarInt::decode(r).await?.try_into()?; - Ok(Self { - subscribe, - track, - group, - sequence, - priority, + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + group_id: VarInt::decode(r).await?, + object_id: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.subscribe.encode(w).await?; - self.track.encode(w).await?; - self.group.encode(w).await?; - self.sequence.encode(w).await?; - VarInt::from_u32(self.priority).encode(w).await?; + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.group_id.encode(w).await?; + self.object_id.encode(w).await?; + self.send_order.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/data/track.rs b/moq-transport/src/data/track.rs index 0547a833..95dd9ddd 100644 --- a/moq-transport/src/data/track.rs +++ b/moq-transport/src/data/track.rs @@ -4,32 +4,28 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; #[derive(Clone, Debug)] pub struct Track { // The subscribe ID. - pub subscribe: VarInt, + pub subscribe_id: VarInt, // The track ID. - pub track: VarInt, + pub track_alias: VarInt, // The priority, where **smaller** values are sent first. - pub priority: u32, + pub send_order: VarInt, } impl Track { pub async fn decode(r: &mut R) -> Result { - let subscribe = VarInt::decode(r).await?; - let track = VarInt::decode(r).await?; - let priority = VarInt::decode(r).await?.try_into()?; - Ok(Self { - subscribe, - track, - priority, + subscribe_id: VarInt::decode(r).await?, + track_alias: VarInt::decode(r).await?, + send_order: VarInt::decode(r).await?, }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.subscribe.encode(w).await?; - self.track.encode(w).await?; - VarInt::from_u32(self.priority).encode(w).await?; + self.subscribe_id.encode(w).await?; + self.track_alias.encode(w).await?; + self.send_order.encode(w).await?; Ok(()) } @@ -37,23 +33,27 @@ impl Track { #[derive(Clone, Debug)] pub struct TrackChunk { - pub group: VarInt, - pub object: VarInt, + pub group_id: VarInt, + pub object_id: VarInt, pub size: VarInt, } impl TrackChunk { pub async fn decode(r: &mut R) -> Result { - let group = VarInt::decode(r).await?; - let object = VarInt::decode(r).await?; + let group_id = VarInt::decode(r).await?; + let object_id = VarInt::decode(r).await?; let size = VarInt::decode(r).await?; - Ok(Self { group, object, size }) + Ok(Self { + group_id, + object_id, + size, + }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.group.encode(w).await?; - self.object.encode(w).await?; + self.group_id.encode(w).await?; + self.object_id.encode(w).await?; self.size.encode(w).await?; Ok(()) diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 734202bd..e80deda4 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -10,8 +10,8 @@ mod coding; mod error; pub mod cache; +pub mod control; pub mod data; -pub mod message; pub mod session; pub mod setup; diff --git a/moq-transport/src/message/subscribe_fin.rs b/moq-transport/src/message/subscribe_fin.rs deleted file mode 100644 index 38a4efda..00000000 --- a/moq-transport/src/message/subscribe_fin.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -/// Sent by the publisher to cleanly terminate a Subscribe. -#[derive(Clone, Debug)] -pub struct SubscribeFin { - /// The ID for this subscription. - pub id: VarInt, - - /// The final group/object sent on this subscription. - pub final_group: VarInt, - pub final_object: VarInt, -} - -impl SubscribeFin { - pub async fn decode(r: &mut R) -> Result { - let id = VarInt::decode(r).await?; - let final_group = VarInt::decode(r).await?; - let final_object = VarInt::decode(r).await?; - - Ok(Self { - id, - final_group, - final_object, - }) - } - - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.id.encode(w).await?; - self.final_group.encode(w).await?; - self.final_object.encode(w).await?; - - Ok(()) - } -} diff --git a/moq-transport/src/message/subscribe_reset.rs b/moq-transport/src/message/subscribe_reset.rs deleted file mode 100644 index dfedf876..00000000 --- a/moq-transport/src/message/subscribe_reset.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::coding::{AsyncRead, AsyncWrite}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -/// Sent by the publisher to terminate a Subscribe. -#[derive(Clone, Debug)] -pub struct SubscribeReset { - /// The ID for this subscription. - pub id: VarInt, - - /// An error code. - pub code: u32, - - /// An optional, human-readable reason. - pub reason: String, - - /// The final group/object sent on this subscription. - pub group: VarInt, - pub object: VarInt, -} - -impl SubscribeReset { - pub async fn decode(r: &mut R) -> Result { - let id = VarInt::decode(r).await?; - let code = VarInt::decode(r).await?.try_into()?; - let reason = String::decode(r).await?; - let group = VarInt::decode(r).await?; - let object = VarInt::decode(r).await?; - - Ok(Self { - id, - code, - reason, - group, - object, - }) - } - - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.id.encode(w).await?; - VarInt::from_u32(self.code).encode(w).await?; - self.reason.encode(w).await?; - - self.group.encode(w).await?; - self.object.encode(w).await?; - - Ok(()) - } -} diff --git a/moq-transport/src/session/control.rs b/moq-transport/src/session/control.rs index 7c84e805..7cfa78c5 100644 --- a/moq-transport/src/session/control.rs +++ b/moq-transport/src/session/control.rs @@ -6,7 +6,7 @@ use tokio::sync::Mutex; use webtransport_quinn::{RecvStream, SendStream}; use super::SessionError; -use crate::message::Message; +use crate::control; #[derive(Debug, Clone)] pub(crate) struct Control { @@ -22,7 +22,7 @@ impl Control { } } - pub async fn send + fmt::Debug>(&self, msg: T) -> Result<(), SessionError> { + pub async fn send + fmt::Debug>(&self, msg: T) -> Result<(), SessionError> { let mut stream = self.send.lock().await; log::info!("sending message: {:?}", msg); msg.into() @@ -33,9 +33,9 @@ impl Control { } // It's likely a mistake to call this from two different tasks, but it's easier to just support it. - pub async fn recv(&self) -> Result { + pub async fn recv(&self) -> Result { let mut stream = self.recv.lock().await; - let msg = Message::decode(&mut *stream) + let msg = control::Message::decode(&mut *stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(msg) diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index 3c49f76d..9f648076 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -56,6 +56,9 @@ pub enum SessionError { #[error("sequence numbers out of order: expected={0} actual={1}")] OutOfOrder(VarInt, VarInt), + #[error("message was used in an invalid context")] + InvalidMessage, + /// An unclassified error because I'm lazy. TODO classify these errors #[error("unknown error: {0}")] Unknown(String), @@ -80,6 +83,7 @@ impl MoqError for SessionError { Self::InvalidSize(_) => 400, Self::RequiredExtension(_) => 426, Self::BoundsExceeded(_) => 500, + Self::InvalidMessage => 400, Self::OutOfOrder(_, _) => 400, } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index fe836117..d495d7d7 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -8,9 +8,9 @@ use webtransport_quinn::Session; use crate::{ cache::{broadcast, segment, track, CacheError}, - data, message, - message::Message, - MoqError, VarInt, + control, + control::Message, + data, MoqError, VarInt, }; use super::{Control, SessionError}; @@ -90,17 +90,17 @@ impl Publisher { } } - async fn recv_announce_ok(&mut self, _msg: &message::AnnounceOk) -> Result<(), SessionError> { + async fn recv_announce_ok(&mut self, _msg: &control::AnnounceOk) -> Result<(), SessionError> { // We didn't send an announce. Err(CacheError::NotFound.into()) } - async fn recv_announce_error(&mut self, _msg: &message::AnnounceError) -> Result<(), SessionError> { + async fn recv_announce_error(&mut self, _msg: &control::AnnounceError) -> Result<(), SessionError> { // We didn't send an announce. Err(CacheError::NotFound.into()) } - async fn recv_subscribe(&mut self, msg: &message::Subscribe) -> Result<(), SessionError> { + async fn recv_subscribe(&mut self, msg: &control::Subscribe) -> Result<(), SessionError> { // Assume that the subscribe ID is unique for now. let abort = match self.start_subscribe(msg.clone()) { Ok(abort) => abort, @@ -114,29 +114,30 @@ impl Publisher { }; self.control - .send(message::SubscribeOk { + .send(control::SubscribeOk { id: msg.id, expires: None, + + // TODO implement this + latest: None, }) .await } async fn reset_subscribe(&mut self, id: VarInt, err: E) -> Result<(), SessionError> { - let msg = message::SubscribeReset { + let msg = control::SubscribeDone { id, - code: err.code(), + code: err.code().into(), reason: err.to_string(), - // TODO properly populate these - // But first: https://github.com/moq-wg/moq-transport/issues/313 - group: VarInt::ZERO, - object: VarInt::ZERO, + // TODO properly populate this + last: None, }; self.control.send(msg).await } - fn start_subscribe(&mut self, msg: message::Subscribe) -> Result { + fn start_subscribe(&mut self, msg: control::Subscribe) -> Result { let mut track = self.source.get_track(&msg.track_name)?; // TODO only clone the fields we need @@ -180,12 +181,12 @@ impl Publisher { async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> { let header = data::Group { - subscribe: id, - track: id, + subscribe_id: id, + track_alias: id, // Properties of the segment - group: segment.sequence, - priority: segment.priority, + group_id: segment.sequence, + send_order: VarInt::from_u32(segment.priority), }; log::trace!("sending stream: {:?}", header); @@ -202,7 +203,7 @@ impl Publisher { while let Some(mut fragment) = segment.fragment().await? { let object = data::GroupChunk { - object: fragment.sequence, + object_id: fragment.sequence, size: VarInt::try_from(fragment.size)?, }; @@ -222,7 +223,7 @@ impl Publisher { Ok(()) } - async fn recv_unsubscribe(&mut self, msg: &message::Unsubscribe) -> Result<(), SessionError> { + async fn recv_unsubscribe(&mut self, msg: &control::Unsubscribe) -> Result<(), SessionError> { let abort = self .subscribes .lock() diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index ac1c7b91..00b9f799 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -8,8 +8,9 @@ use std::{ use crate::{ cache::{broadcast, segment, track, CacheError}, coding::DecodeError, - data, message, - message::Message, + control, + control::Message, + data, session::{Control, SessionError}, VarInt, }; @@ -74,9 +75,8 @@ impl Subscriber { Message::Announce(_) => Ok(()), // don't care Message::Unannounce(_) => Ok(()), // also don't care Message::SubscribeOk(_msg) => Ok(()), // don't care - Message::SubscribeReset(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)), - Message::SubscribeFin(msg) => self.recv_subscribe_error(msg.id, CacheError::Closed), - Message::SubscribeError(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)), + Message::SubscribeDone(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code.try_into()?)), + Message::SubscribeError(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code.try_into()?)), Message::GoAway(_msg) => unimplemented!("GOAWAY"), _ => Err(SessionError::RoleViolation(msg.id())), } @@ -116,6 +116,7 @@ impl Subscriber { data::Header::Track(header) => self.run_track(header, stream).await, data::Header::Group(header) => self.run_group(header, stream).await, data::Header::Object(header) => self.run_object(header, stream).await, + data::Header::Datagram(_) => Err(SessionError::InvalidMessage), } } @@ -136,11 +137,11 @@ impl Subscriber { // TODO error if we get a duplicate group let mut segment = { let mut subscribes = self.subscribes.lock().unwrap(); - let track = subscribes.get_mut(&header.subscribe).ok_or(CacheError::NotFound)?; + let track = subscribes.get_mut(&header.subscribe_id).ok_or(CacheError::NotFound)?; track.create_segment(segment::Info { - sequence: chunk.group, - priority: header.priority, + sequence: chunk.group_id, + priority: header.send_order.try_into()?, // TODO support u64 priorities })? }; @@ -168,11 +169,11 @@ impl Subscriber { async fn run_group(self, header: data::Group, mut stream: RecvStream) -> Result<(), SessionError> { let mut segment = { let mut subscribes = self.subscribes.lock().unwrap(); - let track = subscribes.get_mut(&header.subscribe).ok_or(CacheError::NotFound)?; + let track = subscribes.get_mut(&header.subscribe_id).ok_or(CacheError::NotFound)?; track.create_segment(segment::Info { - sequence: header.group, - priority: header.priority, + sequence: header.group_id, + priority: header.send_order.try_into()?, // TODO support u64 priorities })? }; @@ -193,11 +194,11 @@ impl Subscriber { log::trace!("receiving chunk: {:?}", chunk); // NOTE: We allow gaps, but not going backwards. - if chunk.object.into_inner() < expected { - return Err(SessionError::OutOfOrder(expected.try_into()?, chunk.object)); + if chunk.object_id.into_inner() < expected { + return Err(SessionError::OutOfOrder(expected.try_into()?, chunk.object_id)); } - expected = chunk.object.into_inner() + 1; + expected = chunk.object_id.into_inner() + 1; let mut remain = chunk.size.into(); @@ -230,7 +231,7 @@ impl Subscriber { let id = VarInt::from_u32(self.next.fetch_add(1, atomic::Ordering::SeqCst)); self.subscribes.lock().unwrap().insert(id, track); - let msg = message::Subscribe { + let msg = control::Subscribe { id, track_alias: id, // This alias is useless but part of the spec. @@ -238,10 +239,10 @@ impl Subscriber { track_name, // TODO correctly support these - start_group: message::SubscribeLocation::Latest(VarInt::ZERO), - start_object: message::SubscribeLocation::Absolute(VarInt::ZERO), - end_group: message::SubscribeLocation::None, - end_object: message::SubscribeLocation::None, + start_group: control::SubscribeLocation::Latest(VarInt::ZERO), + start_object: control::SubscribeLocation::Absolute(VarInt::ZERO), + end_group: control::SubscribeLocation::None, + end_object: control::SubscribeLocation::None, params: Default::default(), }; diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/setup/version.rs index 3a3fccf6..03460d3f 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/setup/version.rs @@ -15,65 +15,11 @@ impl Version { /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html pub const DRAFT_01: Version = Version(VarInt::from_u32(0xff000001)); - /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-02.html pub const DRAFT_02: Version = Version(VarInt::from_u32(0xff000002)); - /// Fork of draft-ietf-moq-transport-00. - /// - /// Rough list of differences: - /// - /// # Messages - /// - Messages are sent over a control stream or a data stream. - /// - Data streams: each unidirectional stream contains a single OBJECT message. - /// - Control stream: a (client-initiated) bidirectional stream containing SETUP and then all other messages. - /// - Messages do not contain a length; unknown messages are fatal. - /// - /// # SETUP - /// - SETUP is split into SETUP_CLIENT and SETUP_SERVER with separate IDs. - /// - SETUP uses version `0xff00` for draft-00. - /// - SETUP no longer contains optional parameters; all are encoded in order and possibly zero. - /// - SETUP `role` indicates the role of the sender, not the role of the server. - /// - SETUP `path` field removed; use WebTransport for path. - /// - /// # SUBSCRIBE - /// - SUBSCRIBE `full_name` is split into separate `namespace` and `name` fields. - /// - SUBSCRIBE no longer contains optional parameters; all are encoded in order and possibly zero. - /// - SUBSCRIBE no longer contains the `auth` parameter; use WebTransport for auth. - /// - SUBSCRIBE no longer contains the `group` parameter; concept no longer exists. - /// - SUBSCRIBE contains the `id` instead of SUBSCRIBE_OK. - /// - SUBSCRIBE_OK and SUBSCRIBE_ERROR reference the subscription `id` the instead of the track `full_name`. - /// - SUBSCRIBE_ERROR was renamed to SUBSCRIBE_RESET, sent by publisher to terminate a SUBSCRIBE. - /// - SUBSCRIBE_STOP was added, sent by the subscriber to terminate a SUBSCRIBE. - /// - SUBSCRIBE_OK no longer has `expires`. - /// - /// # ANNOUNCE - /// - ANNOUNCE no longer contains optional parameters; all are encoded in order and possibly zero. - /// - ANNOUNCE no longer contains the `auth` field; use WebTransport for auth. - /// - ANNOUNCE_ERROR was renamed to ANNOUNCE_RESET, sent by publisher to terminate an ANNOUNCE. - /// - ANNOUNCE_STOP was added, sent by the subscriber to terminate an ANNOUNCE. - /// - /// # OBJECT - /// - OBJECT uses a dedicated QUIC stream. - /// - OBJECT has no size and continues until stream FIN. - /// - OBJECT `priority` is a i32 instead of a varint. (for practical reasons) - /// - OBJECT `expires` was added, a varint in seconds. - /// - OBJECT `group` was removed. - /// - /// # GROUP - /// - GROUP concept was removed, replaced with OBJECT as a QUIC stream. - pub const KIXEL_00: Version = Version(VarInt::from_u32(0xbad00)); - - /// Fork of draft-ietf-moq-transport-01. - /// - /// Most of the KIXEL_00 changes made it into the draft, or were reverted. - /// This was only used for a short time until extensions were created. - /// - /// - SUBSCRIBE contains a separate track namespace and track name field (accidental revert). [#277](https://github.com/moq-wg/moq-transport/pull/277) - /// - SUBSCRIBE contains the `track_id` instead of SUBSCRIBE_OK. [#145](https://github.com/moq-wg/moq-transport/issues/145) - /// - SUBSCRIBE_* reference `track_id` the instead of the `track_full_name`. [#145](https://github.com/moq-wg/moq-transport/issues/145) - /// - OBJECT `priority` is still a VarInt, but the max value is a u32 (implementation reasons) - /// - OBJECT messages within the same `group` MUST be on the same QUIC stream. - pub const KIXEL_01: Version = Version(VarInt::from_u32(0xbad01)); + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-03.html + pub const DRAFT_03: Version = Version(VarInt::from_u32(0xff000003)); } impl From for Version {