Skip to content

Commit

Permalink
Merge branch 'main' into feat/moq-sub
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Mar 8, 2024
2 parents 70a5880 + 0d9c0ed commit fa55627
Show file tree
Hide file tree
Showing 53 changed files with 929 additions and 1,129 deletions.
36 changes: 6 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions dev/pub
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ ADDR="${ADDR:-$HOST:$PORT}"
# Generate a random 16 character name by default.
#NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}"

# JK use the name "dev" instead
# JK use the name "bbb" instead, matching the Big Buck Bunny demo.
# TODO use that random name if the host is not localhost
NAME="${NAME:-dev}"
NAME="${NAME:-bbb}"

# Combine the host and name into a URL.
URL="${URL:-"https://$ADDR/$NAME"}"
Expand Down
17 changes: 3 additions & 14 deletions moq-clock/src/clock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time;

use anyhow::Context;
use moq_transport::{
cache::{fragment, segment, track},
Expand Down Expand Up @@ -30,7 +28,6 @@ impl Publisher {
.create_segment(segment::Info {
sequence: VarInt::from_u32(sequence),
priority: 0,
expires: Some(time::Duration::from_secs(60)),
})
.context("failed to create minute segment")?;

Expand All @@ -56,19 +53,11 @@ impl Publisher {
// Everything but the second.
let base = now.format("%Y-%m-%d %H:%M:").to_string();

segment
.fragment(VarInt::ZERO, base.len())?
.chunk(base.clone().into())
.context("failed to write base")?;
segment.write(base.clone().into()).context("failed to write base")?;

loop {
let delta = now.format("%S").to_string();
let sequence = VarInt::from_u32(now.second() + 1);

segment
.fragment(sequence, delta.len())?
.chunk(delta.clone().into())
.context("failed to write delta")?;
segment.write(delta.clone().into()).context("failed to write delta")?;

println!("{}{}", base, delta);

Expand Down Expand Up @@ -119,7 +108,7 @@ impl Subscriber {

log::debug!("got first: {:?}", first);

if first.sequence.into_inner() != 0 {
if first.sequence != VarInt::ZERO {
anyhow::bail!("first object must be zero; I'm not going to implement a reassembly buffer");
}

Expand Down
1 change: 1 addition & 0 deletions moq-pub/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod media;
7 changes: 3 additions & 4 deletions moq-pub/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use clap::Parser;
mod cli;
use cli::*;

mod media;
use media::*;

use moq_pub::media::Media;
use moq_transport::cache::broadcast;

// TODO: clap complete
Expand All @@ -25,8 +23,9 @@ async fn main() -> anyhow::Result<()> {

let config = Config::parse();

let input = tokio::io::stdin();
let (publisher, subscriber) = broadcast::new("");
let mut media = Media::new(&config, publisher).await?;
let mut media = Media::new(input, publisher).await?;

// Create a list of acceptable root certificates.
let mut roots = rustls::RootCertStore::empty();
Expand Down
60 changes: 23 additions & 37 deletions moq-pub/src/media.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
use crate::cli::Config;
use anyhow::{self, Context};
use moq_transport::cache::{broadcast, fragment, segment, track};
use moq_transport::cache::{broadcast, segment, track};
use moq_transport::VarInt;
use mp4::{self, ReadBox};
use serde_json::json;
use std::cmp::max;
use std::collections::HashMap;
use std::io::Cursor;
use std::time;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncRead, AsyncReadExt};

pub struct Media {
pub struct Media<I> {
// We hold on to publisher so we don't close then while media is still being published.
_broadcast: broadcast::Publisher,
_catalog: track::Publisher,
_init: track::Publisher,

// Tracks based on their track ID.
tracks: HashMap<u32, Track>,
input: I,
}

impl Media {
pub async fn new(_config: &Config, mut broadcast: broadcast::Publisher) -> anyhow::Result<Self> {
let mut stdin = tokio::io::stdin();
let ftyp = read_atom(&mut stdin).await?;
impl<I: AsyncRead + Send + Unpin + 'static> Media<I> {
pub async fn new(mut input: I, mut broadcast: broadcast::Publisher) -> anyhow::Result<Self> {
let ftyp = read_atom(&mut input).await?;
anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom");

let moov = read_atom(&mut stdin).await?;
let moov = read_atom(&mut input).await?;
anyhow::ensure!(&moov[4..8] == b"moov", "expected moov atom");

let mut init = ftyp;
Expand All @@ -42,16 +41,13 @@ impl Media {

// Create the catalog track with a single segment.
let mut init_track = broadcast.create_track("0.mp4")?;
let init_segment = init_track.create_segment(segment::Info {
let mut init_segment = init_track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: 0,
expires: None,
})?;

// Create a single fragment, optionally setting the size
let mut init_fragment = init_segment.final_fragment(VarInt::ZERO)?;

init_fragment.chunk(init.into())?;
// Write the init segment to the track.
init_segment.write(init.into())?;

let mut tracks = HashMap::new();

Expand All @@ -77,16 +73,16 @@ impl Media {
_catalog: catalog,
_init: init_track,
tracks,
input,
})
}

pub async fn run(&mut self) -> anyhow::Result<()> {
let mut stdin = tokio::io::stdin();
// The current track name
let mut current = None;

loop {
let atom = read_atom(&mut stdin).await?;
let atom = read_atom(&mut self.input).await?;

let mut reader = Cursor::new(&atom);
let header = mp4::BoxHeader::read(&mut reader)?;
Expand Down Expand Up @@ -129,10 +125,9 @@ impl Media {
init_track_name: &str,
moov: &mp4::MoovBox,
) -> Result<(), anyhow::Error> {
let segment = track.create_segment(segment::Info {
let mut segment = track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: 0,
expires: None,
})?;

let mut tracks = Vec::new();
Expand Down Expand Up @@ -215,10 +210,7 @@ impl Media {
log::info!("catalog: {}", catalog_str);

// Create a single fragment for the segment.
let mut fragment = segment.final_fragment(VarInt::ZERO)?;

// Add the segment and add the fragment.
fragment.chunk(catalog_str.into())?;
segment.write(catalog_str.into())?;

Ok(())
}
Expand Down Expand Up @@ -266,7 +258,7 @@ struct Track {
track: track::Publisher,

// The current segment
current: Option<fragment::Publisher>,
current: Option<segment::Publisher>,

// The number of units per second.
timescale: u64,
Expand All @@ -289,7 +281,7 @@ impl Track {
if let Some(current) = self.current.as_mut() {
if !fragment.keyframe {
// Use the existing segment
current.chunk(raw.into())?;
current.write(raw.into())?;
return Ok(());
}
}
Expand All @@ -305,33 +297,27 @@ impl Track {
.context("timestamp too large")?;

// Create a new segment.
let segment = self.track.create_segment(segment::Info {
let mut segment = self.track.create_segment(segment::Info {
sequence: VarInt::try_from(self.sequence).context("sequence too large")?,

// Newer segments are higher priority
priority: u32::MAX.checked_sub(timestamp).context("priority too large")?,

// Delete segments after 10s.
expires: Some(time::Duration::from_secs(10)),
})?;

// Create a single fragment for the segment that we will keep appending.
let mut fragment = segment.final_fragment(VarInt::ZERO)?;

self.sequence += 1;

// Insert the raw atom into the segment.
fragment.chunk(raw.into())?;
// Create a single fragment for the segment that we will keep appending.
segment.write(raw.into())?;

// Save for the next iteration
self.current = Some(fragment);
self.current = Some(segment);

Ok(())
}

pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
let fragment = self.current.as_mut().context("missing current fragment")?;
fragment.chunk(raw.into())?;
let segment = self.current.as_mut().context("missing current fragment")?;
segment.write(raw.into())?;

Ok(())
}
Expand Down
12 changes: 0 additions & 12 deletions moq-relay/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,4 @@ impl moq_transport::MoqError for RelayError {
Self::WebTransportServer(_) => 500,
}
}

fn reason(&self) -> String {
match self {
Self::Transport(err) => format!("transport error: {}", err.reason()),
Self::Cache(err) => format!("cache error: {}", err.reason()),
Self::MoqApi(err) => format!("api error: {}", err),
Self::Url(err) => format!("url error: {}", err),
Self::MissingNode => "missing node".to_owned(),
Self::WebTransportServer(err) => format!("upstream server error: {}", err),
Self::WebTransportClient(err) => format!("upstream client error: {}", err),
}
}
}
4 changes: 2 additions & 2 deletions moq-relay/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ impl Session {
match role {
Role::Publisher => {
if let Err(err) = self.serve_publisher(id, request, &path).await {
log::warn!("error serving publisher: id={} path={} err={:#?}", id, path, err);
log::warn!("error serving publisher: id={} path={} err={}", id, path, err);
}
}
Role::Subscriber => {
if let Err(err) = self.serve_subscriber(id, request, &path).await {
log::warn!("error serving subscriber: id={} path={} err={:#?}", id, path, err);
log::warn!("error serving subscriber: id={} path={} err={}", id, path, err);
}
}
Role::Both => {
Expand Down
2 changes: 0 additions & 2 deletions moq-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ bytes = "1"
thiserror = "1"
tokio = { version = "1", features = ["macros", "io-util", "sync"] }
log = "0.4"
indexmap = "2"

quinn = "0.10"
webtransport-quinn = "0.6.1"
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }

async-trait = "0.1"
paste = "1"

[dev-dependencies]
# QUIC
Expand Down
Loading

0 comments on commit fa55627

Please sign in to comment.