Skip to content

Commit

Permalink
Don't drop the subscribe for moq-clock. (kixelated#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Apr 2, 2024
1 parent 022092c commit 5bc1cdf
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 1 deletion.
3 changes: 2 additions & 1 deletion moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ async fn run<S: webtransport_generic::Session>(session: S, config: cli::Config)
.context("failed to create MoQ Transport session")?;

let (prod, sub) = serve::Track::new(&config.namespace, &config.track).produce();
subscriber.subscribe(prod).context("failed to subscribe to track")?;
let subscribe = subscriber.subscribe(prod).context("failed to subscribe to track")?;

let clock = clock::Subscriber::new(sub);

tokio::select! {
res = session.run() => res.context("session error")?,
res = clock.run() => res.context("clock error")?,
res = subscribe.closed() => res.context("subscribe closed")?,
}
}

Expand Down
1 change: 1 addition & 0 deletions moq-transport/src/session/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl<S: webtransport_generic::Session> Drop for AnnounceState<S> {
}
}

#[must_use = "unannounce on drop"]
pub struct Announce<S: webtransport_generic::Session> {
publisher: Publisher<S>,
state: State<AnnounceState<S>>,
Expand Down
1 change: 1 addition & 0 deletions moq-transport/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::message::Message;
use crate::util::Queue;
use crate::{message, setup};

#[must_use = "run() must be called"]
pub struct Session<S: webtransport_generic::Session> {
webtransport: S,

Expand Down
1 change: 1 addition & 0 deletions moq-transport/src/session/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl Default for SubscribeState {
}

// Held by the application
#[must_use = "unsubscribe on drop"]
pub struct Subscribe<S: webtransport_generic::Session> {
state: State<SubscribeState>,
subscriber: Subscriber<S>,
Expand Down
1 change: 1 addition & 0 deletions moq-transport/src/session/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl<S: webtransport_generic::Session> Subscriber<S> {
self.announced_queue.pop().await
}

#[must_use = "unsubscribe on drop"]
pub fn subscribe(&mut self, track: serve::TrackWriter) -> Result<Subscribe<S>, ServeError> {
let id = self.subscribe_next.fetch_add(1, atomic::Ordering::Relaxed);

Expand Down

0 comments on commit 5bc1cdf

Please sign in to comment.