Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for draft-03 #132

Merged
merged 8 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions dev/pub
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ ADDR="${ADDR:-$HOST:$PORT}"
# Generate a random 16 character name by default.
#NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}"

# JK use the name "dev" instead
# JK use the name "bbb" instead, matching the Big Buck Bunny demo.
# TODO use that random name if the host is not localhost
NAME="${NAME:-dev}"
NAME="${NAME:-bbb}"

# Combine the host and name into a URL.
URL="${URL:-"https://$ADDR/$NAME"}"
Expand Down
17 changes: 3 additions & 14 deletions moq-clock/src/clock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time;

use anyhow::Context;
use moq_transport::{
cache::{fragment, segment, track},
Expand Down Expand Up @@ -30,7 +28,6 @@ impl Publisher {
.create_segment(segment::Info {
sequence: VarInt::from_u32(sequence),
priority: 0,
expires: Some(time::Duration::from_secs(60)),
})
.context("failed to create minute segment")?;

Expand All @@ -56,19 +53,11 @@ impl Publisher {
// Everything but the second.
let base = now.format("%Y-%m-%d %H:%M:").to_string();

segment
.fragment(VarInt::ZERO, base.len())?
.chunk(base.clone().into())
.context("failed to write base")?;
segment.write(base.clone().into()).context("failed to write base")?;

loop {
let delta = now.format("%S").to_string();
let sequence = VarInt::from_u32(now.second() + 1);

segment
.fragment(sequence, delta.len())?
.chunk(delta.clone().into())
.context("failed to write delta")?;
segment.write(delta.clone().into()).context("failed to write delta")?;

println!("{}{}", base, delta);

Expand Down Expand Up @@ -119,7 +108,7 @@ impl Subscriber {

log::debug!("got first: {:?}", first);

if first.sequence.into_inner() != 0 {
if first.sequence != VarInt::ZERO {
anyhow::bail!("first object must be zero; I'm not going to implement a reassembly buffer");
}

Expand Down
41 changes: 14 additions & 27 deletions moq-pub/src/media.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{self, Context};
use moq_transport::cache::{broadcast, fragment, segment, track};
use moq_transport::cache::{broadcast, segment, track};
use moq_transport::VarInt;
use mp4::{self, ReadBox};
use serde_json::json;
Expand Down Expand Up @@ -41,16 +41,13 @@ impl<I: AsyncRead + Send + Unpin + 'static> Media<I> {

// Create the catalog track with a single segment.
let mut init_track = broadcast.create_track("0.mp4")?;
let init_segment = init_track.create_segment(segment::Info {
let mut init_segment = init_track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: 0,
expires: None,
})?;

// Create a single fragment, optionally setting the size
let mut init_fragment = init_segment.final_fragment(VarInt::ZERO)?;

init_fragment.chunk(init.into())?;
// Write the init segment to the track.
init_segment.write(init.into())?;

let mut tracks = HashMap::new();

Expand Down Expand Up @@ -128,10 +125,9 @@ impl<I: AsyncRead + Send + Unpin + 'static> Media<I> {
init_track_name: &str,
moov: &mp4::MoovBox,
) -> Result<(), anyhow::Error> {
let segment = track.create_segment(segment::Info {
let mut segment = track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: 0,
expires: None,
})?;

let mut tracks = Vec::new();
Expand Down Expand Up @@ -214,10 +210,7 @@ impl<I: AsyncRead + Send + Unpin + 'static> Media<I> {
log::info!("catalog: {}", catalog_str);

// Create a single fragment for the segment.
let mut fragment = segment.final_fragment(VarInt::ZERO)?;

// Add the segment and add the fragment.
fragment.chunk(catalog_str.into())?;
segment.write(catalog_str.into())?;

Ok(())
}
Expand Down Expand Up @@ -265,7 +258,7 @@ struct Track {
track: track::Publisher,

// The current segment
current: Option<fragment::Publisher>,
current: Option<segment::Publisher>,

// The number of units per second.
timescale: u64,
Expand All @@ -288,7 +281,7 @@ impl Track {
if let Some(current) = self.current.as_mut() {
if !fragment.keyframe {
// Use the existing segment
current.chunk(raw.into())?;
current.write(raw.into())?;
return Ok(());
}
}
Expand All @@ -304,33 +297,27 @@ impl Track {
.context("timestamp too large")?;

// Create a new segment.
let segment = self.track.create_segment(segment::Info {
let mut segment = self.track.create_segment(segment::Info {
sequence: VarInt::try_from(self.sequence).context("sequence too large")?,

// Newer segments are higher priority
priority: u32::MAX.checked_sub(timestamp).context("priority too large")?,

// Delete segments after 10s.
expires: Some(time::Duration::from_secs(10)),
})?;

// Create a single fragment for the segment that we will keep appending.
let mut fragment = segment.final_fragment(VarInt::ZERO)?;

self.sequence += 1;

// Insert the raw atom into the segment.
fragment.chunk(raw.into())?;
// Create a single fragment for the segment that we will keep appending.
segment.write(raw.into())?;

// Save for the next iteration
self.current = Some(fragment);
self.current = Some(segment);

Ok(())
}

pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
let fragment = self.current.as_mut().context("missing current fragment")?;
fragment.chunk(raw.into())?;
let segment = self.current.as_mut().context("missing current fragment")?;
segment.write(raw.into())?;

Ok(())
}
Expand Down
12 changes: 0 additions & 12 deletions moq-relay/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,4 @@ impl moq_transport::MoqError for RelayError {
Self::WebTransportServer(_) => 500,
}
}

fn reason(&self) -> String {
match self {
Self::Transport(err) => format!("transport error: {}", err.reason()),
Self::Cache(err) => format!("cache error: {}", err.reason()),
Self::MoqApi(err) => format!("api error: {}", err),
Self::Url(err) => format!("url error: {}", err),
Self::MissingNode => "missing node".to_owned(),
Self::WebTransportServer(err) => format!("upstream server error: {}", err),
Self::WebTransportClient(err) => format!("upstream client error: {}", err),
}
}
}
4 changes: 2 additions & 2 deletions moq-relay/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ impl Session {
match role {
Role::Publisher => {
if let Err(err) = self.serve_publisher(id, request, &path).await {
log::warn!("error serving publisher: id={} path={} err={:#?}", id, path, err);
log::warn!("error serving publisher: id={} path={} err={}", id, path, err);
}
}
Role::Subscriber => {
if let Err(err) = self.serve_subscriber(id, request, &path).await {
log::warn!("error serving subscriber: id={} path={} err={:#?}", id, path, err);
log::warn!("error serving subscriber: id={} path={} err={}", id, path, err);
}
}
Role::Both => {
Expand Down
2 changes: 0 additions & 2 deletions moq-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ bytes = "1"
thiserror = "1"
tokio = { version = "1", features = ["macros", "io-util", "sync"] }
log = "0.4"
indexmap = "2"

quinn = "0.10"
webtransport-quinn = "0.6.1"
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }

async-trait = "0.1"
paste = "1"

[dev-dependencies]
# QUIC
Expand Down
18 changes: 6 additions & 12 deletions moq-transport/src/cache/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub enum CacheError {
#[error("closed")]
Closed,

/// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher.
/// A SUBSCRIBE_DONE or ANNOUNCE_CANCEL was received.
#[error("reset code={0:?}")]
Reset(u32),

Expand All @@ -24,6 +24,10 @@ pub enum CacheError {
/// A resource already exists with that ID.
#[error("duplicate")]
Duplicate,

/// We reported the wrong size for a fragment.
#[error("wrong size")]
WrongSize,
}

impl MoqError for CacheError {
Expand All @@ -35,17 +39,7 @@ impl MoqError for CacheError {
Self::Stop => 206,
Self::NotFound => 404,
Self::Duplicate => 409,
}
}

/// A reason that is sent over the wire.
fn reason(&self) -> String {
match self {
Self::Closed => "closed".to_owned(),
Self::Reset(code) => format!("reset code: {}", code),
Self::Stop => "stop".to_owned(),
Self::NotFound => "not found".to_owned(),
Self::Duplicate => "duplicate".to_owned(),
Self::WrongSize => 500,
}
}
}
30 changes: 25 additions & 5 deletions moq-transport/src/cache/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ pub struct Info {
// NOTE: These may be received out of order or with gaps.
pub sequence: VarInt,

// The size of the fragment, optionally None if this is the last fragment in a segment.
// TODO enforce this size.
pub size: Option<usize>,
// The size of the fragment.
pub size: usize,
}

struct State {
Expand Down Expand Up @@ -79,27 +78,48 @@ pub struct Publisher {
// Immutable segment state.
info: Arc<Info>,

// The amount of promised data that has yet to be written.
remain: usize,

// Closes the segment when all Publishers are dropped.
_dropped: Arc<Dropped>,
}

impl Publisher {
fn new(state: Watch<State>, info: Arc<Info>) -> Self {
let _dropped = Arc::new(Dropped::new(state.clone()));
Self { state, info, _dropped }
let remain = info.size;

Self {
state,
info,
remain,
_dropped,
}
}

/// Write a new chunk of bytes.
pub fn chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> {
if chunk.len() > self.remain {
return Err(CacheError::WrongSize);
}
self.remain -= chunk.len();

let mut state = self.state.lock_mut();
state.closed.clone()?;
state.chunks.push(chunk);

Ok(())
}

/// Close the segment with an error.
pub fn close(self, err: CacheError) -> Result<(), CacheError> {
self.state.lock_mut().close(err)
self.state.lock_mut().close(err)?;
if self.remain != 0 {
Err(CacheError::WrongSize)
} else {
Ok(())
}
}
}

Expand Down
Loading
Loading