Skip to content

Commit

Permalink
Add support for native QUIC (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Mar 22, 2024
1 parent 3aca01f commit e8f5bbe
Show file tree
Hide file tree
Showing 57 changed files with 1,201 additions and 823 deletions.
40 changes: 29 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion dev/clock
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ HOST="${HOST:-localhost}"
PORT="${PORT:-4443}"
ADDR="${ADDR:-$HOST:$PORT}"
NAME="${NAME:-clock}"
SCHEME="${SCHEME:-https}"

# Combine the host and name into a URL.
URL="${URL:-"https://$ADDR"}"
URL="${URL:-"$SCHEME://$ADDR"}"

cargo run --bin moq-clock -- "$URL" --namespace "$NAME" "$@"
3 changes: 2 additions & 1 deletion dev/pub
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export RUST_LOG="${RUST_LOG:-debug}"
HOST="${HOST:-localhost}"
PORT="${PORT:-4443}"
ADDR="${ADDR:-$HOST:$PORT}"
SCHEME="${SCHEME:-https}"

# Generate a random 16 character name by default.
#NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}"
Expand All @@ -20,7 +21,7 @@ ADDR="${ADDR:-$HOST:$PORT}"
NAME="${NAME:-bbb}"

# Combine the host into a URL.
URL="${URL:-"https://$ADDR"}"
URL="${URL:-"$SCHEME://$ADDR"}"

# Default to a source video
INPUT="${INPUT:-dev/source.mp4}"
Expand Down
6 changes: 4 additions & 2 deletions moq-clock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Luke Curley"]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"

version = "0.1.0"
version = "0.2.0"
edition = "2021"

keywords = ["quic", "http3", "webtransport", "media", "live"]
Expand All @@ -18,7 +18,9 @@ moq-transport = { path = "../moq-transport" }

# QUIC
quinn = "0.10"
webtransport-quinn = "0.7"
webtransport-quinn = "0.8"
webtransport-generic = "0.8"
quictransport-quinn = "0.8"
url = "2"

# Crypto
Expand Down
9 changes: 1 addition & 8 deletions moq-clock/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,5 @@ pub struct Config {
}

fn moq_url(s: &str) -> Result<Url, String> {
let url = Url::try_from(s).map_err(|e| e.to_string())?;

// Make sure the scheme is moq
if url.scheme() != "https" {
return Err("url scheme must be https:// for WebTransport".to_string());
}

Ok(url)
Url::try_from(s).map_err(|e| e.to_string())
}
45 changes: 32 additions & 13 deletions moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,40 @@ async fn main() -> anyhow::Result<()> {
tls_config.dangerous().set_certificate_verifier(Arc::new(noop));
}

tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important
log::info!("connecting to server: url={}", config.url);

let arc_tls_config = std::sync::Arc::new(tls_config);
let quinn_client_config = quinn::ClientConfig::new(arc_tls_config);
match config.url.scheme() {
"https" => {
tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important
let client_config = quinn::ClientConfig::new(Arc::new(tls_config));

let mut endpoint = quinn::Endpoint::client(config.bind)?;
endpoint.set_default_client_config(quinn_client_config);
let mut endpoint = quinn::Endpoint::client(config.bind)?;
endpoint.set_default_client_config(client_config);

log::info!("connecting to server: url={}", config.url);
let session = webtransport_quinn::connect(&endpoint, &config.url)
.await
.context("failed to create WebTransport session")?;

run(session, config).await
}
"moqt" => {
tls_config.alpn_protocols = vec![moq_transport::setup::ALPN.to_vec()]; // this one is important
let client_config = quinn::ClientConfig::new(Arc::new(tls_config));

let mut endpoint = quinn::Endpoint::client(config.bind)?;
endpoint.set_default_client_config(client_config);

let session = webtransport_quinn::connect(&endpoint, &config.url)
.await
.context("failed to create WebTransport session")?;
let session = quictransport_quinn::connect(&endpoint, &config.url)
.await
.context("failed to create QUIC Transport session")?;

run(session, config).await
}
_ => anyhow::bail!("unsupported scheme: {}", config.url.scheme()),
}
}

async fn run<S: webtransport_generic::Session>(session: S, config: cli::Config) -> anyhow::Result<()> {
if config.publish {
let (session, publisher) = moq_transport::Publisher::connect(session)
.await
Expand All @@ -94,11 +114,10 @@ async fn main() -> anyhow::Result<()> {
.await
.context("failed to create MoQ Transport session")?;

let subscriber = subscriber
.subscribe(&config.namespace, &config.track, Default::default())
.context("failed to subscribe to track")?;
let (prod, sub) = serve::Track::new(&config.namespace, &config.track).produce();
subscriber.subscribe(prod).context("failed to subscribe to track")?;

let clock = clock::Subscriber::new(subscriber.track());
let clock = clock::Subscriber::new(sub);

tokio::select! {
res = session.run() => res.context("session error")?,
Expand Down
7 changes: 4 additions & 3 deletions moq-pub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Mike English", "Luke Curley"]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"

version = "0.1.0"
version = "0.2.0"
edition = "2021"

keywords = ["quic", "http3", "webtransport", "media", "live"]
Expand All @@ -18,8 +18,9 @@ moq-transport = { path = "../moq-transport" }

# QUIC
quinn = "0.10"
webtransport-quinn = "0.7"
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }
webtransport-quinn = "0.8"
quictransport-quinn = "0.8"
webtransport-generic = "0.8"
url = "2"

# Crypto
Expand Down
9 changes: 1 addition & 8 deletions moq-pub/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,5 @@ pub struct Config {
}

fn moq_url(s: &str) -> Result<Url, String> {
let url = Url::try_from(s).map_err(|e| e.to_string())?;

// Make sure the scheme is moq
if url.scheme() != "https" {
return Err("url scheme must be https:// for WebTransport".to_string());
}

Ok(url)
Url::try_from(s).map_err(|e| e.to_string())
}
46 changes: 35 additions & 11 deletions moq-pub/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use cli::*;

use moq_pub::media::Media;
use moq_transport::serve;
use tokio::io::AsyncRead;

// TODO: clap complete

Expand All @@ -25,7 +26,7 @@ async fn main() -> anyhow::Result<()> {

let input = tokio::io::stdin();
let (publisher, broadcast) = serve::Broadcast::new(&config.name).produce();
let mut media = Media::new(input, publisher).await?;
let media = Media::new(input, publisher).await?;

// Create a list of acceptable root certificates.
let mut roots = rustls::RootCertStore::empty();
Expand Down Expand Up @@ -62,25 +63,48 @@ async fn main() -> anyhow::Result<()> {
tls_config.dangerous().set_certificate_verifier(Arc::new(noop));
}

tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important
log::info!("connecting to relay: url={}", config.url);

let arc_tls_config = std::sync::Arc::new(tls_config);
let quinn_client_config = quinn::ClientConfig::new(arc_tls_config);
match config.url.scheme() {
"https" => {
tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()];
let client_config = quinn::ClientConfig::new(Arc::new(tls_config));

let mut endpoint = quinn::Endpoint::client(config.bind)?;
endpoint.set_default_client_config(quinn_client_config);
let mut endpoint = quinn::Endpoint::client(config.bind)?;
endpoint.set_default_client_config(client_config);

log::info!("connecting to relay: url={}", config.url);
let session = webtransport_quinn::connect(&endpoint, &config.url)
.await
.context("failed to create WebTransport session")?;

let session = webtransport_quinn::connect(&endpoint, &config.url)
.await
.context("failed to create WebTransport session")?;
run(session, media, broadcast).await
}
"moqt" => {
tls_config.alpn_protocols = vec![moq_transport::setup::ALPN.to_vec()];
let client_config = quinn::ClientConfig::new(Arc::new(tls_config));

let mut endpoint = quinn::Endpoint::client(config.bind)?;
endpoint.set_default_client_config(client_config);

let session = quictransport_quinn::connect(&endpoint, &config.url)
.await
.context("failed to create QUIC Transport session")?;

run(session, media, broadcast).await
}
_ => anyhow::bail!("url scheme must be 'https' or 'moqt'"),
}
}

async fn run<T: webtransport_generic::Session, I: AsyncRead + Send + Unpin>(
session: T,
mut media: Media<I>,
broadcast: serve::BroadcastSubscriber,
) -> anyhow::Result<()> {
let (session, publisher) = moq_transport::Publisher::connect(session)
.await
.context("failed to create MoQ Transport publisher")?;

// TODO run a task that returns a 404 for all unknown subscriptions.
tokio::select! {
res = session.run() => res.context("session error")?,
res = media.run() => res.context("media error")?,
Expand Down
2 changes: 1 addition & 1 deletion moq-pub/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct Media<I> {
input: I,
}

impl<I: AsyncRead + Send + Unpin + 'static> Media<I> {
impl<I: AsyncRead + Send + Unpin> Media<I> {
pub async fn new(mut input: I, mut broadcast: BroadcastPublisher) -> anyhow::Result<Self> {
let ftyp = read_atom(&mut input).await?;
anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom");
Expand Down
Loading

0 comments on commit e8f5bbe

Please sign in to comment.