From d9d10cd4a6a52f4280307df102f679fd600f8fef Mon Sep 17 00:00:00 2001 From: kixelated Date: Sat, 6 Apr 2024 06:42:38 -0700 Subject: [PATCH] Replace webtransport-generic with web-transport (#151) --- Cargo.lock | 125 ++++++++++++------------ moq-clock/Cargo.toml | 4 +- moq-clock/src/main.rs | 32 +++--- moq-pub/Cargo.toml | 4 +- moq-pub/src/main.rs | 37 ++++--- moq-relay/Cargo.toml | 4 +- moq-relay/src/connection.rs | 86 +++++----------- moq-relay/src/error.rs | 4 +- moq-relay/src/relay.rs | 16 +-- moq-relay/src/remote.rs | 6 +- moq-transport/Cargo.lock | 12 +-- moq-transport/Cargo.toml | 3 +- moq-transport/src/coding/reader.rs | 73 -------------- moq-transport/src/session/announce.rs | 32 +++--- moq-transport/src/session/announced.rs | 12 +-- moq-transport/src/session/error.rs | 40 ++------ moq-transport/src/session/mod.rs | 76 +++++++------- moq-transport/src/session/publisher.rs | 38 +++---- moq-transport/src/session/reader.rs | 45 +++------ moq-transport/src/session/subscribe.rs | 12 +-- moq-transport/src/session/subscribed.rs | 28 +++--- moq-transport/src/session/subscriber.rs | 31 +++--- moq-transport/src/session/writer.rs | 19 ++-- 23 files changed, 274 insertions(+), 465 deletions(-) delete mode 100644 moq-transport/src/coding/reader.rs diff --git a/Cargo.lock b/Cargo.lock index c1f440bd..c7d446a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -759,9 +759,9 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "js-sys" -version = "0.3.64" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" dependencies = [ "wasm-bindgen", ] @@ -867,7 +867,6 @@ dependencies = [ "env_logger", "log", "moq-transport", - "quictransport-quinn", "quinn", "rustls", "rustls-native-certs", @@ -876,8 +875,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", - "webtransport-generic", - "webtransport-quinn", + "web-transport-quinn", ] [[package]] @@ -891,7 +889,6 @@ dependencies = [ "log", "moq-transport", "mp4", - "quictransport-quinn", "quinn", "rfc6381-codec", "rustls", @@ -902,8 +899,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", - "webtransport-generic", - "webtransport-quinn", + "web-transport-quinn", ] [[package]] @@ -920,7 +916,6 @@ dependencies = [ "log", "moq-api", "moq-transport", - "quictransport-quinn", "quinn", "ring 0.16.20", "rustls", @@ -932,9 +927,8 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "web-transport-quinn", "webpki", - "webtransport-generic", - "webtransport-quinn", ] [[package]] @@ -959,7 +953,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", - "webtransport-generic", + "web-transport", ] [[package]] @@ -1225,20 +1219,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "quictransport-quinn" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04227967142d740ffc66367bad009a6315e626c4830fcdb9d55904bd3ae1e3f9" -dependencies = [ - "bytes", - "quinn", - "thiserror", - "tokio", - "url", - "webtransport-generic", -] - [[package]] name = "quinn" version = "0.10.2" @@ -2109,9 +2089,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -2119,9 +2099,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" dependencies = [ "bumpalo", "log", @@ -2146,9 +2126,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2156,9 +2136,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", @@ -2169,50 +2149,37 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.87" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] name = "web-sys" -version = "0.3.64" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" dependencies = [ "js-sys", "wasm-bindgen", ] [[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring 0.17.3", - "untrusted 0.9.0", -] - -[[package]] -name = "webpki-roots" -version = "0.25.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" - -[[package]] -name = "webtransport-generic" -version = "0.9.0" +name = "web-transport" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3796cc7d83f889b8fd4c1a731b08d83618ea1a3a2e3fe09225562754acc9b814" +checksum = "dea5c3a39f22c0e88678f588fa4c7cb47607a1b0fe45182d0feb1262d27148ed" dependencies = [ "bytes", + "thiserror", + "web-transport-quinn", + "web-transport-wasm", ] [[package]] -name = "webtransport-proto" -version = "0.6.1" +name = "web-transport-proto" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7de84935ba0f2292c5f78f042758fc4a0ce506699e674d059c517f56b04091be" +checksum = "a8b76c23545207b5ec17a4b533d274dc2587a8c308da6b14eeec06353f6e8ae6" dependencies = [ "bytes", "http", @@ -2221,10 +2188,10 @@ dependencies = [ ] [[package]] -name = "webtransport-quinn" -version = "0.9.0" +name = "web-transport-quinn" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18ac46cd68286fd4a70bc69dcab62bafbbe407927ae643f3bfce26f27090b19a" +checksum = "cf5eb5142259b46d473e44075bde93336f3ad6054d9909a93baca87dad5a03d4" dependencies = [ "bytes", "futures", @@ -2235,10 +2202,38 @@ dependencies = [ "thiserror", "tokio", "url", - "webtransport-generic", - "webtransport-proto", + "web-transport-proto", +] + +[[package]] +name = "web-transport-wasm" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64be28348e18cb1f44e4c8733dc2bd9520d782be840b2b978724dfd1b1bdefa3" +dependencies = [ + "bytes", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", ] +[[package]] +name = "webpki" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" +dependencies = [ + "ring 0.17.3", + "untrusted 0.9.0", +] + +[[package]] +name = "webpki-roots" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" + [[package]] name = "winapi" version = "0.3.9" diff --git a/moq-clock/Cargo.toml b/moq-clock/Cargo.toml index 63de6554..e350454f 100644 --- a/moq-clock/Cargo.toml +++ b/moq-clock/Cargo.toml @@ -18,9 +18,7 @@ moq-transport = { path = "../moq-transport", version = "0.3" } # QUIC quinn = "0.10" -webtransport-quinn = { version = "0.9" } -webtransport-generic = { version = "0.9" } -quictransport-quinn = { version = "0.9" } +web-transport-quinn = "0.1" url = "2" # Crypto diff --git a/moq-clock/src/main.rs b/moq-clock/src/main.rs index 570f509d..c48d8976 100644 --- a/moq-clock/src/main.rs +++ b/moq-clock/src/main.rs @@ -59,19 +59,17 @@ async fn main() -> anyhow::Result<()> { log::info!("connecting to server: url={}", config.url); - match config.url.scheme() { + let session = match config.url.scheme() { "https" => { - tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important + tls_config.alpn_protocols = vec![web_transport_quinn::ALPN.to_vec()]; // this one is important let client_config = quinn::ClientConfig::new(Arc::new(tls_config)); let mut endpoint = quinn::Endpoint::client(config.bind)?; endpoint.set_default_client_config(client_config); - let session = webtransport_quinn::connect(&endpoint, &config.url) + web_transport_quinn::connect(&endpoint, &config.url) .await - .context("failed to create WebTransport session")?; - - run(session, config).await + .context("failed to create WebTransport session")? } "moqt" => { tls_config.alpn_protocols = vec![moq_transport::setup::ALPN.to_vec()]; // this one is important @@ -80,19 +78,25 @@ async fn main() -> anyhow::Result<()> { let mut endpoint = quinn::Endpoint::client(config.bind)?; endpoint.set_default_client_config(client_config); - let session = quictransport_quinn::connect(&endpoint, &config.url) + let host = config.url.host().context("invalid DNS name")?.to_string(); + let port = config.url.port().unwrap_or(443); + + // Look up the DNS entry. + let remote = tokio::net::lookup_host((host.clone(), port)) .await - .context("failed to create QUIC Transport session")?; + .context("failed DNS lookup")? + .next() + .context("no DNS entries")?; - run(session, config).await + // Connect to the server using the addr we just resolved. + let conn = endpoint.connect(remote, &host)?.await?; + conn.into() } _ => anyhow::bail!("unsupported scheme: {}", config.url.scheme()), - } -} + }; -async fn run(session: S, config: cli::Config) -> anyhow::Result<()> { if config.publish { - let (session, mut publisher) = moq_transport::Publisher::connect(session) + let (session, mut publisher) = moq_transport::Publisher::connect(session.into()) .await .context("failed to create MoQ Transport session")?; @@ -110,7 +114,7 @@ async fn run(session: S, config: cli::Config) res = publisher.serve(broadcast_sub) => res.context("failed to serve broadcast")?, } } else { - let (session, mut subscriber) = moq_transport::Subscriber::connect(session) + let (session, mut subscriber) = moq_transport::Subscriber::connect(session.into()) .await .context("failed to create MoQ Transport session")?; diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index b3560238..7acd1c3b 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -18,9 +18,7 @@ moq-transport = { path = "../moq-transport", version = "0.3" } # QUIC quinn = "0.10" -webtransport-quinn = { version = "0.9" } -quictransport-quinn = { version = "0.9" } -webtransport-generic = { version = "0.9" } +web-transport-quinn = "0.1" url = "2" # Crypto diff --git a/moq-pub/src/main.rs b/moq-pub/src/main.rs index 662e71c6..3578460b 100644 --- a/moq-pub/src/main.rs +++ b/moq-pub/src/main.rs @@ -8,7 +8,6 @@ use cli::*; use moq_pub::media::Media; use moq_transport::serve; -use tokio::io::AsyncRead; // TODO: clap complete @@ -26,7 +25,7 @@ async fn main() -> anyhow::Result<()> { let input = tokio::io::stdin(); let (publisher, broadcast) = serve::Broadcast::new(&config.name).produce(); - let media = Media::new(input, publisher).await?; + let mut media = Media::new(input, publisher).await?; // Create a list of acceptable root certificates. let mut roots = rustls::RootCertStore::empty(); @@ -65,19 +64,17 @@ async fn main() -> anyhow::Result<()> { log::info!("connecting to relay: url={}", config.url); - match config.url.scheme() { + let session = match config.url.scheme() { "https" => { - tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; + tls_config.alpn_protocols = vec![web_transport_quinn::ALPN.to_vec()]; let client_config = quinn::ClientConfig::new(Arc::new(tls_config)); let mut endpoint = quinn::Endpoint::client(config.bind)?; endpoint.set_default_client_config(client_config); - let session = webtransport_quinn::connect(&endpoint, &config.url) + web_transport_quinn::connect(&endpoint, &config.url) .await - .context("failed to create WebTransport session")?; - - run(session, media, broadcast).await + .context("failed to create WebTransport session")? } "moqt" => { tls_config.alpn_protocols = vec![moq_transport::setup::ALPN.to_vec()]; @@ -86,22 +83,24 @@ async fn main() -> anyhow::Result<()> { let mut endpoint = quinn::Endpoint::client(config.bind)?; endpoint.set_default_client_config(client_config); - let session = quictransport_quinn::connect(&endpoint, &config.url) + let host = config.url.host().context("invalid DNS name")?.to_string(); + let port = config.url.port().unwrap_or(443); + + // Look up the DNS entry. + let remote = tokio::net::lookup_host((host.clone(), port)) .await - .context("failed to create QUIC Transport session")?; + .context("failed DNS lookup")? + .next() + .context("no DNS entries")?; - run(session, media, broadcast).await + // Connect to the server using the addr we just resolved. + let conn = endpoint.connect(remote, &host)?.await?; + conn.into() } _ => anyhow::bail!("url scheme must be 'https' or 'moqt'"), - } -} + }; -async fn run( - session: T, - mut media: Media, - broadcast: serve::BroadcastReader, -) -> anyhow::Result<()> { - let (session, mut publisher) = moq_transport::Publisher::connect(session) + let (session, mut publisher) = moq_transport::Publisher::connect(session.into()) .await .context("failed to create MoQ Transport publisher")?; diff --git a/moq-relay/Cargo.toml b/moq-relay/Cargo.toml index 169ebaef..ac2d5aea 100644 --- a/moq-relay/Cargo.toml +++ b/moq-relay/Cargo.toml @@ -17,9 +17,7 @@ moq-api = { path = "../moq-api", version = "0.0.1" } # QUIC quinn = "0.10" -quictransport-quinn = { version = "0.9" } -webtransport-quinn = { version = "0.9" } -webtransport-generic = { version = "0.9" } +web-transport-quinn = "0.1" url = "2" # Crypto diff --git a/moq-relay/src/connection.rs b/moq-relay/src/connection.rs index 0d46cbb0..61a877d2 100644 --- a/moq-relay/src/connection.rs +++ b/moq-relay/src/connection.rs @@ -48,60 +48,35 @@ impl Connection { server_name, ); - match alpn.as_bytes() { - webtransport_quinn::ALPN => self.serve_webtransport(conn).await?, - moq_transport::setup::ALPN => self.serve_quic(conn).await?, + let session = match alpn.as_bytes() { + web_transport_quinn::ALPN => { + // Wait for the CONNECT request. + let request = web_transport_quinn::accept(conn) + .await + .context("failed to receive WebTransport request")?; + + // Accept the CONNECT request. + request + .ok() + .await + .context("failed to respond to WebTransport request")? + } + // A bit of a hack to pretend like we're a WebTransport session + moq_transport::setup::ALPN => conn.into(), _ => anyhow::bail!("unsupported ALPN: {}", alpn), - } - - Ok(()) - } - - async fn serve_webtransport(self, conn: quinn::Connection) -> anyhow::Result<()> { - // Wait for the CONNECT request. - let request = webtransport_quinn::accept(conn) - .await - .context("failed to receive WebTransport request")?; - - // Accept the CONNECT request. - let session = request - .ok() - .await - .context("failed to respond to WebTransport request")?; - - let (session, publisher, subscriber) = moq_transport::Session::accept(session).await?; - - let mut tasks = FuturesUnordered::new(); - tasks.push(session.run().boxed()); - - if let Some(remote) = publisher { - tasks.push(Self::serve_subscriber(self.clone(), remote).boxed()); - } - - if let Some(remote) = subscriber { - tasks.push(Self::serve_publisher(self.clone(), remote).boxed()); - } - - // Return the first error - tasks.select_next_some().await?; - - Ok(()) - } - - async fn serve_quic(self, conn: quinn::Connection) -> anyhow::Result<()> { - let session: quictransport_quinn::Session = conn.into(); + }; - let (session, publisher, subscriber) = moq_transport::Session::accept(session).await?; + let (session, publisher, subscriber) = moq_transport::Session::accept(session.into()).await?; let mut tasks = FuturesUnordered::new(); - tasks.push(session.run().boxed()); + tasks.push(session.run().boxed_local()); if let Some(remote) = publisher { - tasks.push(Self::serve_subscriber(self.clone(), remote).boxed()); + tasks.push(Self::serve_subscriber(self.clone(), remote).boxed_local()); } if let Some(remote) = subscriber { - tasks.push(Self::serve_publisher(self.clone(), remote).boxed()); + tasks.push(Self::serve_publisher(self.clone(), remote).boxed_local()); } // Return the first error @@ -110,10 +85,7 @@ impl Connection { Ok(()) } - async fn serve_subscriber( - self, - mut remote: Publisher, - ) -> Result<(), SessionError> { + async fn serve_subscriber(self, mut remote: Publisher) -> Result<(), SessionError> { let mut tasks = FuturesUnordered::new(); loop { @@ -135,10 +107,7 @@ impl Connection { } } - async fn serve_subscribe( - self, - subscribe: Subscribed, - ) -> Result<(), RelayError> { + async fn serve_subscribe(self, subscribe: Subscribed) -> Result<(), RelayError> { if let Some(local) = self.locals.1.route(&subscribe.namespace) { log::debug!("using local announce: {:?}", local.info); if let Some(track) = local.subscribe(&subscribe.name)? { @@ -163,10 +132,7 @@ impl Connection { Err(ServeError::NotFound.into()) } - async fn serve_publisher( - self, - mut remote: Subscriber, - ) -> Result<(), SessionError> { + async fn serve_publisher(self, mut remote: Subscriber) -> Result<(), SessionError> { let mut tasks = FuturesUnordered::new(); loop { @@ -189,11 +155,7 @@ impl Connection { } } - async fn serve_announce( - mut self, - remote: Subscriber, - mut announce: Announced, - ) -> Result<(), RelayError> { + async fn serve_announce(mut self, remote: Subscriber, mut announce: Announced) -> Result<(), RelayError> { let mut publisher = match self.locals.0.announce(&announce.namespace).await { Ok(publisher) => { announce.ok()?; diff --git a/moq-relay/src/error.rs b/moq-relay/src/error.rs index 65258c6f..df743a57 100644 --- a/moq-relay/src/error.rs +++ b/moq-relay/src/error.rs @@ -18,10 +18,10 @@ pub enum RelayError { Url(#[from] url::ParseError), #[error("webtransport client error: {0}")] - Client(#[from] webtransport_quinn::ClientError), + Client(#[from] web_transport_quinn::ClientError), #[error("webtransport server error: {0}")] - Server(#[from] webtransport_quinn::ServerError), + Server(#[from] web_transport_quinn::ServerError), #[error("missing node")] MissingNode, diff --git a/moq-relay/src/relay.rs b/moq-relay/src/relay.rs index a39d64fc..15d0f8c6 100644 --- a/moq-relay/src/relay.rs +++ b/moq-relay/src/relay.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, time}; use anyhow::Context; -use tokio::task::JoinSet; +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use crate::{ Config, Connection, Locals, LocalsConsumer, LocalsProducer, Remotes, RemotesConsumer, RemotesProducer, Tls, @@ -20,8 +20,8 @@ impl Relay { pub async fn new(config: Config, tls: Tls) -> anyhow::Result { let mut client_config = tls.client.clone(); let mut server_config = tls.server.clone(); - client_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec(), moq_transport::setup::ALPN.to_vec()]; - server_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec(), moq_transport::setup::ALPN.to_vec()]; + client_config.alpn_protocols = vec![web_transport_quinn::ALPN.to_vec(), moq_transport::setup::ALPN.to_vec()]; + server_config.alpn_protocols = vec![web_transport_quinn::ALPN.to_vec(), moq_transport::setup::ALPN.to_vec()]; // Enable BBR congestion control // TODO validate the implementation @@ -72,10 +72,10 @@ impl Relay { pub async fn run(self) -> anyhow::Result<()> { log::info!("listening on {}", self.quic.local_addr()?); - let mut tasks = JoinSet::new(); + let mut tasks = FuturesUnordered::new(); let remotes = self.remotes.map(|(producer, consumer)| { - tasks.spawn(producer.run()); + tasks.push(producer.run().boxed_local()); consumer }); @@ -85,14 +85,14 @@ impl Relay { let conn = res.context("failed to accept QUIC connection")?; let session = Connection::new(self.locals.clone(), remotes.clone()); - tasks.spawn(async move { + tasks.push(async move { if let Err(err) = session.run(conn).await { log::warn!("connection terminated: {}", err); } Ok(()) - }); + }.boxed_local()); }, - res = tasks.join_next(), if !tasks.is_empty() => res.expect("no tasks").expect("task aborted")?, + res = tasks.next(), if !tasks.is_empty() => res.unwrap()?, } } } diff --git a/moq-relay/src/remote.rs b/moq-relay/src/remote.rs index c89e30de..166ca758 100644 --- a/moq-relay/src/remote.rs +++ b/moq-relay/src/remote.rs @@ -237,11 +237,11 @@ impl RemoteProducer { pub async fn run_inner(&mut self) -> Result<(), RelayError> { // TODO reuse QUIC and MoQ sessions - let session = webtransport_quinn::connect(&self.quic, &self.url).await?; - let (session, mut subscriber) = moq_transport::Subscriber::connect(session).await?; + let session = web_transport_quinn::connect(&self.quic, &self.url).await?; + let (session, mut subscriber) = moq_transport::Subscriber::connect(session.into()).await?; // Run the session - let mut session = session.run().boxed(); + let mut session = session.run().boxed_local(); let mut tasks = FuturesUnordered::new(); let mut done = None; diff --git a/moq-transport/Cargo.lock b/moq-transport/Cargo.lock index 02d73b07..da84b210 100644 --- a/moq-transport/Cargo.lock +++ b/moq-transport/Cargo.lock @@ -538,7 +538,7 @@ dependencies = [ "quinn", "thiserror", "tokio", - "webtransport-quinn", + "web-transport-quinn", ] [[package]] @@ -1120,7 +1120,7 @@ dependencies = [ ] [[package]] -name = "webtransport-generic" +name = "web-transport-generic" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df712317d761312996f654739debeb3838eb02c6fd9146d9efdfd08a46674e45" @@ -1130,7 +1130,7 @@ dependencies = [ ] [[package]] -name = "webtransport-proto" +name = "web-transport-proto" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebeada5037d6302980ae2e0ab8d840e329c1697c612c6c077172de2b7631a276" @@ -1142,7 +1142,7 @@ dependencies = [ ] [[package]] -name = "webtransport-quinn" +name = "web-transport-quinn" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cceb876dbd00a87b3fd8869d1c315e07c28b0eb54d59b592a07a634f5e2b64e1" @@ -1156,8 +1156,8 @@ dependencies = [ "thiserror", "tokio", "url", - "webtransport-generic", - "webtransport-proto", + "web-transport-generic", + "web-transport-proto", ] [[package]] diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index f00c76d9..edcec598 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -20,7 +20,7 @@ thiserror = "1" tokio = { version = "1", features = ["macros", "io-util", "sync"] } log = "0.4" -webtransport-generic = { version = "0.9" } +web-transport = "0.1" paste = "1" futures = "0.3" @@ -43,6 +43,7 @@ log = { version = "0.4", features = ["std"] } env_logger = "0.9" mp4 = "0.13" anyhow = { version = "1", features = ["backtrace"] } + serde_json = "1" rfc6381-codec = "0.1" tracing = "0.1" diff --git a/moq-transport/src/coding/reader.rs b/moq-transport/src/coding/reader.rs deleted file mode 100644 index 0338b98a..00000000 --- a/moq-transport/src/coding/reader.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::{cmp, io}; - -use bytes::Buf; -use tokio::io::{AsyncRead, AsyncReadExt}; - -use crate::coding::Decode; - -use super::DecodeError; - -pub struct Reader { - stream: S, - buffer: bytes::BytesMut, -} - -impl Reader { - pub fn new(stream: S) -> Self { - Self { - stream, - buffer: Default::default(), - } - } - - pub async fn decode(&mut self) -> Result { - loop { - let mut cursor = io::Cursor::new(&self.buffer); - - // Try to decode with the current buffer. - let mut remain = match T::decode(&mut cursor) { - Ok(msg) => { - self.buffer.advance(cursor.position() as usize); - return Ok(msg); - } - Err(DecodeError::More(remain)) => remain, - Err(err) => return Err(err), - }; - - // Read in more data until we reach the requested amount. - // We always read at least once to avoid an infinite loop if some dingus puts remain=0 - loop { - let size = self.stream.read_buf(&mut self.buffer).await?; - remain = remain.saturating_sub(size); - if remain == 0 { - break; - } - } - } - } - - pub async fn read(&mut self, max_size: usize) -> Result, io::Error> { - if self.buffer.is_empty() { - // TODO avoid making a copy by using Quinn's read_chunk - let size = self.stream.read_buf(&mut self.buffer).await?; - if size == 0 { - return Ok(None); - } - } - - let size = cmp::min(self.buffer.len(), max_size); - Ok(Some(self.buffer.split_to(size).freeze())) - } - - pub async fn done(&mut self) -> Result { - Ok(self.buffer.is_empty() && self.stream.read_buf(&mut self.buffer).await? == 0) - } - - pub fn buffered(&self) -> &[u8] { - &self.buffer - } - - pub fn into_inner(self) -> S { - self.stream - } -} diff --git a/moq-transport/src/session/announce.rs b/moq-transport/src/session/announce.rs index 52603410..7993f2d1 100644 --- a/moq-transport/src/session/announce.rs +++ b/moq-transport/src/session/announce.rs @@ -11,13 +11,13 @@ pub struct AnnounceInfo { pub namespace: String, } -struct AnnounceState { - subscribers: VecDeque>, +struct AnnounceState { + subscribers: VecDeque, ok: bool, closed: Result<(), ServeError>, } -impl Default for AnnounceState { +impl Default for AnnounceState { fn default() -> Self { Self { subscribers: Default::default(), @@ -27,7 +27,7 @@ impl Default for AnnounceState { } } -impl Drop for AnnounceState { +impl Drop for AnnounceState { fn drop(&mut self) { for subscriber in self.subscribers.drain(..) { subscriber.close(ServeError::NotFound).ok(); @@ -36,15 +36,15 @@ impl Drop for AnnounceState { } #[must_use = "unannounce on drop"] -pub struct Announce { - publisher: Publisher, - state: State>, +pub struct Announce { + publisher: Publisher, + state: State, pub info: AnnounceInfo, } -impl Announce { - pub(super) fn new(mut publisher: Publisher, namespace: String) -> (Announce, AnnounceRecv) { +impl Announce { + pub(super) fn new(mut publisher: Publisher, namespace: String) -> (Announce, AnnounceRecv) { let info = AnnounceInfo { namespace: namespace.clone(), }; @@ -83,7 +83,7 @@ impl Announce { } } - pub async fn subscribed(&mut self) -> Result>, ServeError> { + pub async fn subscribed(&mut self) -> Result, ServeError> { loop { let notify = { let state = self.state.lock(); @@ -103,7 +103,7 @@ impl Announce { } } -impl Drop for Announce { +impl Drop for Announce { fn drop(&mut self) { if self.state.lock().closed.is_err() { return; @@ -115,7 +115,7 @@ impl Drop for Announce { } } -impl ops::Deref for Announce { +impl ops::Deref for Announce { type Target = AnnounceInfo; fn deref(&self) -> &Self::Target { @@ -123,11 +123,11 @@ impl ops::Deref for Announce { } } -pub(super) struct AnnounceRecv { - state: State>, +pub(super) struct AnnounceRecv { + state: State, } -impl AnnounceRecv { +impl AnnounceRecv { pub fn recv_ok(&mut self) -> Result<(), ServeError> { if let Some(mut state) = self.state.lock_mut() { if state.ok { @@ -150,7 +150,7 @@ impl AnnounceRecv { Ok(()) } - pub fn recv_subscribe(&mut self, subscriber: Subscribed) -> Result<(), ServeError> { + pub fn recv_subscribe(&mut self, subscriber: Subscribed) -> Result<(), ServeError> { let mut state = self.state.lock_mut().ok_or(ServeError::Done)?; state.subscribers.push_back(subscriber); diff --git a/moq-transport/src/session/announced.rs b/moq-transport/src/session/announced.rs index 228deb15..3aac26af 100644 --- a/moq-transport/src/session/announced.rs +++ b/moq-transport/src/session/announced.rs @@ -9,8 +9,8 @@ use super::{AnnounceInfo, Subscriber}; #[derive(Default)] struct AnnouncedState {} -pub struct Announced { - session: Subscriber, +pub struct Announced { + session: Subscriber, state: State, pub info: AnnounceInfo, @@ -19,8 +19,8 @@ pub struct Announced { error: Option, } -impl Announced { - pub(super) fn new(session: Subscriber, namespace: String) -> (Announced, AnnouncedRecv) { +impl Announced { + pub(super) fn new(session: Subscriber, namespace: String) -> (Announced, AnnouncedRecv) { let info = AnnounceInfo { namespace }; let (send, recv) = State::init(); @@ -65,7 +65,7 @@ impl Announced { } } -impl ops::Deref for Announced { +impl ops::Deref for Announced { type Target = AnnounceInfo; fn deref(&self) -> &AnnounceInfo { @@ -73,7 +73,7 @@ impl ops::Deref for Announced { } } -impl Drop for Announced { +impl Drop for Announced { fn drop(&mut self) { let err = self.error.clone().unwrap_or(ServeError::Done); diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index 780cede9..42b810e9 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -1,19 +1,15 @@ -use std::sync::Arc; - -use webtransport_generic::ErrorCode; - use crate::{coding, serve, setup}; #[derive(thiserror::Error, Debug, Clone)] pub enum SessionError { - #[error("webtransport error: {0}")] - WebTransport(Arc), + #[error("webtransport session: {0}")] + Session(#[from] web_transport::SessionError), - #[error("write error: {0}")] - Write(Arc), + #[error("webtransport write: {0}")] + Write(#[from] web_transport::WriteError), - #[error("read error: {0}")] - Read(Arc), + #[error("webtransport read: {0}")] + Read(#[from] web_transport::ReadError), #[error("encode error: {0}")] Encode(#[from] coding::EncodeError), @@ -51,35 +47,13 @@ pub enum SessionError { WrongSize, } -impl SessionError { - pub(super) fn from_webtransport(err: E) -> Self { - Self::WebTransport(Arc::new(err)) - } - - pub(super) fn from_read(err: E) -> Self { - Self::Read(Arc::new(err)) - } - - pub(super) fn from_write(err: E) -> Self { - Self::Write(Arc::new(err)) - } -} - -/* -impl From for SessionError { - fn from(err: T) -> Self { - Self::WebTransport(Arc::new(err)) - } -} -*/ - impl SessionError { /// An integer code that is sent over the wire. pub fn code(&self) -> u64 { match self { Self::RoleIncompatible(..) => 406, Self::RoleViolation => 405, - Self::WebTransport(_) => 503, + Self::Session(_) => 503, Self::Read(_) => 500, Self::Write(_) => 500, Self::Version(..) => 406, diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index 4584736d..6901d2fd 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -27,25 +27,25 @@ use crate::util::Queue; use crate::{message, setup}; #[must_use = "run() must be called"] -pub struct Session { - webtransport: S, +pub struct Session { + webtransport: web_transport::Session, - sender: Writer, - recver: Reader, + sender: Writer, + recver: Reader, - publisher: Option>, - subscriber: Option>, + publisher: Option, + subscriber: Option, outgoing: Queue, } -impl Session { +impl Session { fn new( - webtransport: S, - sender: Writer, - recver: Reader, + webtransport: web_transport::Session, + sender: Writer, + recver: Reader, role: setup::Role, - ) -> (Self, Option>, Option>) { + ) -> (Self, Option, Option) { let outgoing = Queue::default(); let publisher = role .is_publisher() @@ -65,16 +65,16 @@ impl Session { } pub async fn connect( - session: S, - ) -> Result<(Session, Option>, Option>), SessionError> { + session: web_transport::Session, + ) -> Result<(Session, Option, Option), SessionError> { Self::connect_role(session, setup::Role::Both).await } pub async fn connect_role( - session: S, + mut session: web_transport::Session, role: setup::Role, - ) -> Result<(Session, Option>, Option>), SessionError> { - let control = session.open_bi().await.map_err(SessionError::from_webtransport)?; + ) -> Result<(Session, Option, Option), SessionError> { + let control = session.open_bi().await?; let mut sender = Writer::new(control.0); let mut recver = Reader::new(control.1); @@ -110,15 +110,17 @@ impl Session { Ok(Session::new(session, sender, recver, role)) } - pub async fn accept(session: S) -> Result<(Session, Option>, Option>), SessionError> { + pub async fn accept( + session: web_transport::Session, + ) -> Result<(Session, Option, Option), SessionError> { Self::accept_role(session, setup::Role::Both).await } pub async fn accept_role( - session: S, + mut session: web_transport::Session, role: setup::Role, - ) -> Result<(Session, Option>, Option>), SessionError> { - let control = session.accept_bi().await.map_err(SessionError::from_webtransport)?; + ) -> Result<(Session, Option, Option), SessionError> { + let control = session.accept_bi().await?; let mut sender = Writer::new(control.0); let mut recver = Reader::new(control.1); @@ -162,22 +164,19 @@ impl Session { pub async fn run(self) -> Result<(), SessionError> { let mut tasks = FuturesUnordered::new(); - tasks.push(Self::run_recv(self.recver, self.publisher, self.subscriber.clone()).boxed()); - tasks.push(Self::run_send(self.sender, self.outgoing).boxed()); + tasks.push(Self::run_recv(self.recver, self.publisher, self.subscriber.clone()).boxed_local()); + tasks.push(Self::run_send(self.sender, self.outgoing).boxed_local()); if let Some(subscriber) = self.subscriber { - tasks.push(Self::run_streams(self.webtransport.clone(), subscriber.clone()).boxed()); - tasks.push(Self::run_datagrams(self.webtransport, subscriber).boxed()); + tasks.push(Self::run_streams(self.webtransport.clone(), subscriber.clone()).boxed_local()); + tasks.push(Self::run_datagrams(self.webtransport, subscriber).boxed_local()); } let res = tasks.select_next_some().await; Err(res.expect_err("run terminated with OK")) } - async fn run_send( - mut sender: Writer, - outgoing: Queue, - ) -> Result<(), SessionError> { + async fn run_send(mut sender: Writer, outgoing: Queue) -> Result<(), SessionError> { loop { let msg = outgoing.pop().await; log::debug!("sending message: {:?}", msg); @@ -186,9 +185,9 @@ impl Session { } async fn run_recv( - mut recver: Reader, - mut publisher: Option>, - mut subscriber: Option>, + mut recver: Reader, + mut publisher: Option, + mut subscriber: Option, ) -> Result<(), SessionError> { loop { let msg: message::Message = recver.decode().await?; @@ -221,13 +220,13 @@ impl Session { } } - async fn run_streams(webtransport: S, subscriber: Subscriber) -> Result<(), SessionError> { + async fn run_streams(mut webtransport: web_transport::Session, subscriber: Subscriber) -> Result<(), SessionError> { let mut tasks = FuturesUnordered::new(); loop { tokio::select! { res = webtransport.accept_uni() => { - let stream = res.map_err(SessionError::from_webtransport)?; + let stream = res?; let subscriber = subscriber.clone(); tasks.push(async move { @@ -241,13 +240,12 @@ impl Session { } } - async fn run_datagrams(webtransport: S, mut subscriber: Subscriber) -> Result<(), SessionError> { + async fn run_datagrams( + mut webtransport: web_transport::Session, + mut subscriber: Subscriber, + ) -> Result<(), SessionError> { loop { - let datagram = webtransport - .recv_datagram() - .await - .map_err(SessionError::from_webtransport)?; - + let datagram = webtransport.recv_datagram().await?; subscriber.recv_datagram(datagram)?; } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 40ec83fb..3918e99b 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -16,18 +16,18 @@ use super::{Announce, AnnounceRecv, Session, SessionError, Subscribed, Subscribe // TODO remove Clone. #[derive(Clone)] -pub struct Publisher { - webtransport: S, +pub struct Publisher { + webtransport: web_transport::Session, - announces: Arc>>>, + announces: Arc>>, subscribed: Arc>>, - unknown: Queue>, + unknown: Queue, outgoing: Queue, } -impl Publisher { - pub(crate) fn new(outgoing: Queue, webtransport: S) -> Self { +impl Publisher { + pub(crate) fn new(outgoing: Queue, webtransport: web_transport::Session) -> Self { Self { webtransport, announces: Default::default(), @@ -37,17 +37,17 @@ impl Publisher { } } - pub async fn accept(session: S) -> Result<(Session, Publisher), SessionError> { + pub async fn accept(session: web_transport::Session) -> Result<(Session, Publisher), SessionError> { let (session, publisher, _) = Session::accept_role(session, setup::Role::Publisher).await?; Ok((session, publisher.unwrap())) } - pub async fn connect(session: S) -> Result<(Session, Publisher), SessionError> { + pub async fn connect(session: web_transport::Session) -> Result<(Session, Publisher), SessionError> { let (session, publisher, _) = Session::connect_role(session, setup::Role::Publisher).await?; Ok((session, publisher.unwrap())) } - pub fn announce(&mut self, namespace: &str) -> Result, SessionError> { + pub fn announce(&mut self, namespace: &str) -> Result { let mut announces = self.announces.lock().unwrap(); let entry = match announces.entry(namespace.to_string()) { @@ -100,10 +100,7 @@ impl Publisher { } } - pub async fn serve_subscribe( - subscribe: Subscribed, - track: Option, - ) -> Result<(), SessionError> { + pub async fn serve_subscribe(subscribe: Subscribed, track: Option) -> Result<(), SessionError> { match track { Some(track) => subscribe.serve(track).await?, None => subscribe.close(ServeError::NotFound)?, @@ -113,7 +110,7 @@ impl Publisher { } // Returns subscriptions that do not map to an active announce. - pub async fn subscribed(&mut self) -> Subscribed { + pub async fn subscribed(&mut self) -> Subscribed { self.unknown.pop().await } @@ -214,16 +211,11 @@ impl Publisher { self.announces.lock().unwrap().remove(namespace); } - pub(super) async fn open_uni(&self) -> Result { - self.webtransport - .open_uni() - .await - .map_err(SessionError::from_webtransport) + pub(super) async fn open_uni(&mut self) -> Result { + Ok(self.webtransport.open_uni().await?) } - pub(super) fn send_datagram(&self, data: bytes::Bytes) -> Result<(), SessionError> { - self.webtransport - .send_datagram(data) - .map_err(SessionError::from_webtransport) + pub(super) async fn send_datagram(&mut self, data: bytes::Bytes) -> Result<(), SessionError> { + Ok(self.webtransport.send_datagram(data).await?) } } diff --git a/moq-transport/src/session/reader.rs b/moq-transport/src/session/reader.rs index 0c86b1a5..8b374530 100644 --- a/moq-transport/src/session/reader.rs +++ b/moq-transport/src/session/reader.rs @@ -6,13 +6,13 @@ use crate::coding::{Decode, DecodeError}; use super::SessionError; -pub struct Reader { - stream: S, +pub struct Reader { + stream: web_transport::RecvStream, buffer: BytesMut, } -impl Reader { - pub fn new(stream: S) -> Self { +impl Reader { + pub fn new(stream: web_transport::RecvStream) -> Self { Self { stream, buffer: Default::default(), @@ -24,29 +24,23 @@ impl Reader { let mut cursor = io::Cursor::new(&self.buffer); // Try to decode with the current buffer. - let mut remain = match T::decode(&mut cursor) { + let required = match T::decode(&mut cursor) { Ok(msg) => { self.buffer.advance(cursor.position() as usize); return Ok(msg); } - Err(DecodeError::More(remain)) => remain, // Try again with more data + Err(DecodeError::More(required)) => self.buffer.len() + required, // Try again with more data Err(err) => return Err(err.into()), }; // Read in more data until we reach the requested amount. // We always read at least once to avoid an infinite loop if some dingus puts remain=0 loop { - let size = self - .stream - .read_buf(&mut self.buffer) - .await - .map_err(SessionError::from_read)?; - if size == 0 { - return Err(DecodeError::More(remain).into()); - } + if !self.stream.read_buf(&mut self.buffer).await? { + return Err(DecodeError::More(required - self.buffer.len()).into()); + }; - remain = remain.saturating_sub(size); - if remain == 0 { + if self.buffer.len() >= required { break; } } @@ -60,17 +54,7 @@ impl Reader { return Ok(Some(data)); } - let chunk = match self.stream.read_chunk().await.map_err(SessionError::from_read)? { - Some(chunk) if chunk.len() <= max => Some(chunk), - Some(mut chunk) => { - // The chunk is too big; add the tail to the buffer for next read. - self.buffer.extend_from_slice(&chunk.split_off(max)); - Some(chunk) - } - None => None, - }; - - Ok(chunk) + Ok(self.stream.read_chunk(max).await?) } pub async fn done(&mut self) -> Result { @@ -78,11 +62,6 @@ impl Reader { return Ok(false); } - let size = self - .stream - .read_buf(&mut self.buffer) - .await - .map_err(SessionError::from_read)?; - Ok(size == 0) + Ok(!self.stream.read_buf(&mut self.buffer).await?) } } diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 2b923927..1962f634 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -30,16 +30,16 @@ impl Default for SubscribeState { // Held by the application #[must_use = "unsubscribe on drop"] -pub struct Subscribe { +pub struct Subscribe { state: State, - subscriber: Subscriber, + subscriber: Subscriber, id: u64, pub info: SubscribeInfo, } -impl Subscribe { - pub(super) fn new(mut subscriber: Subscriber, id: u64, track: TrackWriter) -> (Subscribe, SubscribeRecv) { +impl Subscribe { + pub(super) fn new(mut subscriber: Subscriber, id: u64, track: TrackWriter) -> (Subscribe, SubscribeRecv) { subscriber.send_message(message::Subscribe { id, track_alias: id, @@ -90,13 +90,13 @@ impl Subscribe { } } -impl Drop for Subscribe { +impl Drop for Subscribe { fn drop(&mut self) { self.subscriber.send_message(message::Unsubscribe { id: self.id }); } } -impl ops::Deref for Subscribe { +impl ops::Deref for Subscribe { type Target = SubscribeInfo; fn deref(&self) -> &SubscribeInfo { diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 8a988a89..365432ab 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -3,8 +3,6 @@ use std::ops; use futures::stream::FuturesUnordered; use futures::StreamExt; -use webtransport_generic::SendStream; - use crate::coding::Encode; use crate::serve::{ServeError, TrackReaderMode}; use crate::util::State; @@ -39,8 +37,8 @@ impl Default for SubscribedState { } } -pub struct Subscribed { - publisher: Publisher, +pub struct Subscribed { + publisher: Publisher, state: State, msg: message::Subscribe, ok: bool, @@ -48,8 +46,8 @@ pub struct Subscribed { pub info: SubscribeInfo, } -impl Subscribed { - pub(super) fn new(publisher: Publisher, msg: message::Subscribe) -> (Self, SubscribedRecv) { +impl Subscribed { + pub(super) fn new(publisher: Publisher, msg: message::Subscribe) -> (Self, SubscribedRecv) { let (send, recv) = State::init(); let info = SubscribeInfo { namespace: msg.track_namespace.clone(), @@ -127,7 +125,7 @@ impl Subscribed { } } -impl ops::Deref for Subscribed { +impl ops::Deref for Subscribed { type Target = SubscribeInfo; fn deref(&self) -> &Self::Target { @@ -135,7 +133,7 @@ impl ops::Deref for Subscribed { } } -impl Drop for Subscribed { +impl Drop for Subscribed { fn drop(&mut self) { let state = self.state.lock(); let err = state.closed.as_ref().err().cloned().unwrap_or(ServeError::Done); @@ -160,12 +158,12 @@ impl Drop for Subscribed { } } -impl Subscribed { +impl Subscribed { async fn serve_track(&mut self, mut track: serve::StreamReader) -> Result<(), SessionError> { let mut stream = self.publisher.open_uni().await?; // TODO figure out u32 vs u64 priority - stream.priority(track.priority as i32); + stream.set_priority(track.priority as i32); let mut writer = Writer::new(stream); @@ -247,13 +245,13 @@ impl Subscribed { async fn serve_group( header: data::GroupHeader, mut group: serve::GroupReader, - publisher: Publisher, + mut publisher: Publisher, state: State, ) -> Result<(), SessionError> { let mut stream = publisher.open_uni().await?; // TODO figure out u32 vs u64 priority - stream.priority(group.priority as i32); + stream.set_priority(group.priority as i32); let mut writer = Writer::new(stream); @@ -327,7 +325,7 @@ impl Subscribed { async fn serve_object( header: data::ObjectHeader, mut object: serve::ObjectReader, - publisher: Publisher, + mut publisher: Publisher, state: State, ) -> Result<(), SessionError> { state @@ -338,7 +336,7 @@ impl Subscribed { let mut stream = publisher.open_uni().await?; // TODO figure out u32 vs u64 priority - stream.priority(object.priority as i32); + stream.set_priority(object.priority as i32); let mut writer = Writer::new(stream); @@ -371,7 +369,7 @@ impl Subscribed { let mut buffer = bytes::BytesMut::with_capacity(datagram.payload.len() + 100); datagram.encode(&mut buffer)?; - self.publisher.send_datagram(buffer.into())?; + self.publisher.send_datagram(buffer.into()).await?; log::trace!("sent datagram: {:?}", datagram); self.state diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index e5a9ce26..16c692fa 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -17,9 +17,9 @@ use super::{Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, // TODO remove Clone. #[derive(Clone)] -pub struct Subscriber { +pub struct Subscriber { announced: Arc>>, - announced_queue: Queue>, + announced_queue: Queue, subscribes: Arc>>, subscribe_next: Arc, @@ -27,7 +27,7 @@ pub struct Subscriber { outgoing: Queue, } -impl Subscriber { +impl Subscriber { pub(super) fn new(outgoing: Queue) -> Self { Self { announced: Default::default(), @@ -38,21 +38,21 @@ impl Subscriber { } } - pub async fn accept(session: S) -> Result<(Session, Self), SessionError> { + pub async fn accept(session: web_transport::Session) -> Result<(Session, Self), SessionError> { let (session, _, subscriber) = Session::accept_role(session, setup::Role::Subscriber).await?; Ok((session, subscriber.unwrap())) } - pub async fn connect(session: S) -> Result<(Session, Self), SessionError> { + pub async fn connect(session: web_transport::Session) -> Result<(Session, Self), SessionError> { let (session, _, subscriber) = Session::connect_role(session, setup::Role::Subscriber).await?; Ok((session, subscriber.unwrap())) } - pub async fn announced(&mut self) -> Announced { + pub async fn announced(&mut self) -> Announced { self.announced_queue.pop().await } - pub fn subscribe(&mut self, track: serve::TrackWriter) -> Result, ServeError> { + pub fn subscribe(&mut self, track: serve::TrackWriter) -> Result { let id = self.subscribe_next.fetch_add(1, atomic::Ordering::Relaxed); let (send, recv) = Subscribe::new(self.clone(), id, track); @@ -142,7 +142,7 @@ impl Subscriber { self.announced.lock().unwrap().remove(namespace); } - pub(super) async fn recv_stream(mut self, stream: S::RecvStream) -> Result<(), SessionError> { + pub(super) async fn recv_stream(mut self, stream: web_transport::RecvStream) -> Result<(), SessionError> { let mut reader = Reader::new(stream); let header: data::Header = reader.decode().await?; @@ -160,11 +160,7 @@ impl Subscriber { res } - async fn recv_stream_inner( - &mut self, - reader: Reader, - header: data::Header, - ) -> Result<(), SessionError> { + async fn recv_stream_inner(&mut self, reader: Reader, header: data::Header) -> Result<(), SessionError> { let id = header.subscribe_id(); // This is super silly, but I couldn't figure out a way to avoid the mutex guard across awaits. @@ -194,7 +190,7 @@ impl Subscriber { Ok(()) } - async fn recv_track(mut track: serve::StreamWriter, mut reader: Reader) -> Result<(), SessionError> { + async fn recv_track(mut track: serve::StreamWriter, mut reader: Reader) -> Result<(), SessionError> { log::trace!("received track: {:?}", track.info); let mut prev: Option = None; @@ -224,7 +220,7 @@ impl Subscriber { Ok(()) } - async fn recv_group(mut group: serve::GroupWriter, mut reader: Reader) -> Result<(), SessionError> { + async fn recv_group(mut group: serve::GroupWriter, mut reader: Reader) -> Result<(), SessionError> { log::trace!("received group: {:?}", group.info); while !reader.done().await? { @@ -245,10 +241,7 @@ impl Subscriber { Ok(()) } - async fn recv_object( - mut object: serve::ObjectWriter, - mut reader: Reader, - ) -> Result<(), SessionError> { + async fn recv_object(mut object: serve::ObjectWriter, mut reader: Reader) -> Result<(), SessionError> { log::trace!("received object: {:?}", object.info); while let Some(data) = reader.read_chunk(usize::MAX).await? { diff --git a/moq-transport/src/session/writer.rs b/moq-transport/src/session/writer.rs index 131de74c..6ba99876 100644 --- a/moq-transport/src/session/writer.rs +++ b/moq-transport/src/session/writer.rs @@ -5,13 +5,13 @@ use crate::coding::{Encode, EncodeError}; use super::SessionError; use bytes::Buf; -pub struct Writer { - stream: S, +pub struct Writer { + stream: web_transport::SendStream, buffer: bytes::BytesMut, } -impl Writer { - pub fn new(stream: S) -> Self { +impl Writer { + pub fn new(stream: web_transport::SendStream) -> Self { Self { stream, buffer: Default::default(), @@ -23,10 +23,7 @@ impl Writer { msg.encode(&mut self.buffer)?; while !self.buffer.is_empty() { - self.stream - .write_buf(&mut self.buffer) - .await - .map_err(SessionError::from_write)?; + self.stream.write_buf(&mut self.buffer).await?; } Ok(()) @@ -36,11 +33,7 @@ impl Writer { let mut cursor = io::Cursor::new(buf); while cursor.has_remaining() { - let size = self - .stream - .write_buf(&mut cursor) - .await - .map_err(SessionError::from_write)?; + let size = self.stream.write_buf(&mut cursor).await?; if size == 0 { return Err(EncodeError::More(cursor.remaining()).into()); }