Skip to content

Commit

Permalink
draft-03 support?
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated committed Mar 8, 2024
1 parent 6e438b1 commit adecfc9
Show file tree
Hide file tree
Showing 28 changed files with 303 additions and 319 deletions.
2 changes: 1 addition & 1 deletion moq-transport/src/cache/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub enum CacheError {
#[error("closed")]
Closed,

/// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher.
/// A SUBSCRIBE_DONE or ANNOUNCE_CANCEL was received.
#[error("reset code={0:?}")]
Reset(u32),

Expand Down
3 changes: 3 additions & 0 deletions moq-transport/src/coding/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum DecodeError {
#[error("invalid subscribe location")]
InvalidSubscribeLocation,

#[error("invalid value")]
InvalidValue,

#[error("varint bounds exceeded")]
BoundsExceeded(#[from] BoundsExceeded),

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,62 +1,11 @@
//! Low-level message sent over the wire, as defined in the specification.
//!
//! All of these messages are sent over a bidirectional QUIC stream.
//! This introduces some head-of-line blocking but preserves ordering.
//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams.
//!
//! Messages sent by the publisher:
//! - [Announce]
//! - [Unannounce]
//! - [SubscribeOk]
//! - [SubscribeError]
//! - [SubscribeReset]
//! - [Object]
//!
//! Messages sent by the subscriber:
//! - [Subscribe]
//! - [Unsubscribe]
//! - [AnnounceOk]
//! - [AnnounceError]
//!
//! Example flow:
//! ```test
//! -> ANNOUNCE namespace="foo"
//! <- ANNOUNCE_OK namespace="foo"
//! <- SUBSCRIBE id=0 namespace="foo" name="bar"
//! -> SUBSCRIBE_OK id=0
//! -> OBJECT id=0 sequence=69 priority=4 expires=30
//! -> OBJECT id=0 sequence=70 priority=4 expires=30
//! -> OBJECT id=0 sequence=70 priority=4 expires=30
//! <- SUBSCRIBE_STOP id=0
//! -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer"
//! ```
mod announce;
mod announce_ok;
mod announce_reset;
mod go_away;
mod subscribe;
mod subscribe_error;
mod subscribe_fin;
mod subscribe_ok;
mod subscribe_reset;
mod unannounce;
mod unsubscribe;

pub use announce::*;
pub use announce_ok::*;
pub use announce_reset::*;
pub use go_away::*;
pub use subscribe::*;
pub use subscribe_error::*;
pub use subscribe_fin::*;
pub use subscribe_ok::*;
pub use subscribe_reset::*;
pub use unannounce::*;
pub use unsubscribe::*;

use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError, VarInt};
use std::fmt;

use super::{
Announce, AnnounceError, AnnounceOk, GoAway, Subscribe, SubscribeDone, SubscribeError, SubscribeOk, Unannounce,
Unsubscribe,
};

// Use a macro to generate the message types rather than copy-paste.
// This implements a decode/encode method that uses the specified type.
macro_rules! message_types {
Expand Down Expand Up @@ -138,8 +87,7 @@ message_types! {
// SUBSCRIBE family, sent by publisher
SubscribeOk = 0x4,
SubscribeError = 0x5,
SubscribeFin = 0xb,
SubscribeReset = 0xc,
SubscribeDone = 0xb,

// ANNOUNCE family, sent by publisher
Announce = 0x6,
Expand Down
56 changes: 56 additions & 0 deletions moq-transport/src/control/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//! Low-level message sent over the wire, as defined in the specification.
//!
//! TODO Update this
//! All of these messages are sent over a bidirectional QUIC stream.
//! This introduces some head-of-line blocking but preserves ordering.
//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams.
//!
//! Messages sent by the publisher:
//! - [Announce]
//! - [Unannounce]
//! - [SubscribeOk]
//! - [SubscribeError]
//! - [SubscribeReset]
//! - [Object]
//!
//! Messages sent by the subscriber:
//! - [Subscribe]
//! - [Unsubscribe]
//! - [AnnounceOk]
//! - [AnnounceError]
//!
//! Example flow:
//! ```test
//! -> ANNOUNCE namespace="foo"
//! <- ANNOUNCE_OK namespace="foo"
//! <- SUBSCRIBE id=0 namespace="foo" name="bar"
//! -> SUBSCRIBE_OK id=0
//! -> OBJECT id=0 sequence=69 priority=4 expires=30
//! -> OBJECT id=0 sequence=70 priority=4 expires=30
//! -> OBJECT id=0 sequence=70 priority=4 expires=30
//! <- SUBSCRIBE_STOP id=0
//! -> SUBSCRIBE_RESET id=0 code=206 reason="closed by peer"
//! ```
mod announce;
mod announce_error;
mod announce_ok;
mod go_away;
mod message;
mod subscribe;
mod subscribe_done;
mod subscribe_error;
mod subscribe_ok;
mod unannounce;
mod unsubscribe;

pub use announce::*;
pub use announce_error::*;
pub use announce_ok::*;
pub use go_away::*;
pub use message::*;
pub use subscribe::*;
pub use subscribe_done::*;
pub use subscribe_error::*;
pub use subscribe_ok::*;
pub use unannounce::*;
pub use unsubscribe::*;
File renamed without changes.
51 changes: 51 additions & 0 deletions moq-transport/src/control/subscribe_done.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use crate::coding::{AsyncRead, AsyncWrite};
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};

/// Sent by the publisher to cleanly terminate a Subscribe.
#[derive(Clone, Debug)]
pub struct SubscribeDone {
/// The ID for this subscription.
pub id: VarInt,

/// The error code
pub code: VarInt,

/// An optional error reason
pub reason: String,

/// The final group/object sent on this subscription.
pub last: Option<(VarInt, VarInt)>,
}

impl SubscribeDone {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let id = VarInt::decode(r).await?;
let code = VarInt::decode(r).await?;
let reason = String::decode(r).await?;
let last = match r.read_u8().await? {
0 => None,
1 => Some((VarInt::decode(r).await?, VarInt::decode(r).await?)),
_ => return Err(DecodeError::InvalidValue),
};

Ok(Self { id, code, reason, last })
}

pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
self.id.encode(w).await?;
self.code.encode(w).await?;
self.reason.encode(w).await?;

if let Some((group, object)) = self.last {
w.write_u8(1).await?;
group.encode(w).await?;
object.encode(w).await?;
} else {
w.write_u8(0).await?;
}

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct SubscribeError {
pub id: VarInt,

// An error code.
pub code: u32,
pub code: VarInt,

// An optional, human-readable reason.
pub reason: String,
Expand All @@ -20,7 +20,7 @@ pub struct SubscribeError {
impl SubscribeError {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let id = VarInt::decode(r).await?;
let code = VarInt::decode(r).await?.try_into()?;
let code = VarInt::decode(r).await?;
let reason = String::decode(r).await?;
let alias = VarInt::decode(r).await?;

Expand All @@ -34,7 +34,7 @@ impl SubscribeError {

pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
self.id.encode(w).await?;
VarInt::from_u32(self.code).encode(w).await?;
self.code.encode(w).await?;
self.reason.encode(w).await?;
self.alias.encode(w).await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};

use crate::coding::{AsyncRead, AsyncWrite};
Expand All @@ -10,6 +12,9 @@ pub struct SubscribeOk {

/// The subscription will expire in this many milliseconds.
pub expires: Option<VarInt>,

/// The latest group and object for the track.
pub latest: Option<(VarInt, VarInt)>,
}

impl SubscribeOk {
Expand All @@ -19,14 +24,33 @@ impl SubscribeOk {
VarInt::ZERO => None,
expires => Some(expires),
};
Ok(Self { id, expires })

let latest = match r.read_u8().await? {
0 => None,
1 => Some((VarInt::decode(r).await?, VarInt::decode(r).await?)),
_ => return Err(DecodeError::InvalidValue),
};

Ok(Self { id, expires, latest })
}
}

impl SubscribeOk {
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
self.id.encode(w).await?;
self.expires.unwrap_or(VarInt::ZERO).encode(w).await?;

match self.latest {
Some((group, object)) => {
w.write_u8(1).await?;
group.encode(w).await?;
object.encode(w).await?;
}
None => {
w.write_u8(0).await?;
}
}

Ok(())
}
}
File renamed without changes.
File renamed without changes.
41 changes: 41 additions & 0 deletions moq-transport/src/data/datagram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use crate::coding::{AsyncRead, AsyncWrite};
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};

#[derive(Clone, Debug)]
pub struct Datagram {
// The subscribe ID.
pub subscribe_id: VarInt,

// The track alias.
pub track_alias: VarInt,

// The sequence number within the track.
pub group_id: VarInt,

// The object ID within the group.
pub object_id: VarInt,

// The priority, where **smaller** values are sent first.
pub send_order: VarInt,
}

impl Datagram {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
Ok(Self {
subscribe_id: VarInt::decode(r).await?,
track_alias: VarInt::decode(r).await?,
group_id: VarInt::decode(r).await?,
object_id: VarInt::decode(r).await?,
send_order: VarInt::decode(r).await?,
})
}

pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
self.subscribe_id.encode(w).await?;
self.track_alias.encode(w).await?;
self.group_id.encode(w).await?;
self.object_id.encode(w).await?;
self.send_order.encode(w).await?;
Ok(())
}
}
Loading

0 comments on commit adecfc9

Please sign in to comment.