Skip to content

Commit

Permalink
Add back the fixed moq-clock.
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated committed Mar 20, 2024
1 parent e1ed4b3 commit f91a8e0
Show file tree
Hide file tree
Showing 12 changed files with 587 additions and 53 deletions.
214 changes: 176 additions & 38 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
members = ["moq-transport", "moq-relay", "moq-pub", "moq-api"]
members = ["moq-transport", "moq-relay", "moq-pub", "moq-api", "moq-clock"]
resolver = "2"
4 changes: 2 additions & 2 deletions dev/clock
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ ADDR="${ADDR:-$HOST:$PORT}"
NAME="${NAME:-clock}"

# Combine the host and name into a URL.
URL="${URL:-"https://$ADDR/$NAME"}"
URL="${URL:-"https://$ADDR"}"

cargo run --bin moq-clock -- "$URL" "$@"
cargo run --bin moq-clock -- "$URL" --namespace "$NAME" "$@"
46 changes: 46 additions & 0 deletions moq-clock/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[package]
name = "moq-clock"
description = "CLOCK over QUIC"
authors = ["Luke Curley"]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"

version = "0.1.0"
edition = "2021"

keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
moq-transport = { path = "../moq-transport" }

# QUIC
quinn = "0.10"
webtransport-quinn = "0.7"
url = "2"

# Crypto
rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls-native-certs = "0.6"
rustls-pemfile = "1"

# Async stuff
tokio = { version = "1", features = ["full"] }

# CLI, logging, error handling
clap = { version = "4", features = ["derive"] }
log = { version = "0.4", features = ["std"] }
env_logger = "0.9"
anyhow = { version = "1", features = ["backtrace"] }
tracing = "0.1"
tracing-subscriber = "0.3"

# CLOCK STUFF
chrono = "0.4"

[build-dependencies]
clap = { version = "4", features = ["derive"] }
clap_mangen = "0.2"
url = "2"
50 changes: 50 additions & 0 deletions moq-clock/src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use clap::Parser;
use std::{net, path};
use url::Url;

#[derive(Parser, Clone, Debug)]
pub struct Config {
/// Listen for UDP packets on the given address.
#[arg(long, default_value = "[::]:0")]
pub bind: net::SocketAddr,

/// Connect to the given URL starting with https://
#[arg(value_parser = moq_url)]
pub url: Url,

/// Use the TLS root CA at this path, encoded as PEM.
///
/// This value can be provided multiple times for multiple roots.
/// If this is empty, system roots will be used instead
#[arg(long)]
pub tls_root: Vec<path::PathBuf>,

/// Danger: Disable TLS certificate verification.
///
/// Fine for local development, but should be used in caution in production.
#[arg(long)]
pub tls_disable_verify: bool,

/// Publish the current time to the relay, otherwise only subscribe.
#[arg(long)]
pub publish: bool,

/// The name of the clock track.
#[arg(long, default_value = "clock")]
pub namespace: String,

/// The name of the clock track.
#[arg(long, default_value = "now")]
pub track: String,
}

fn moq_url(s: &str) -> Result<Url, String> {
let url = Url::try_from(s).map_err(|e| e.to_string())?;

// Make sure the scheme is moq
if url.scheme() != "https" {
return Err("url scheme must be https:// for WebTransport".to_string());
}

Ok(url)
}
161 changes: 161 additions & 0 deletions moq-clock/src/clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use anyhow::Context;
use moq_transport::serve;

use chrono::prelude::*;

pub struct Publisher {
track: serve::TrackPublisher,
}

impl Publisher {
pub fn new(track: serve::TrackPublisher) -> Self {
Self { track }
}

pub async fn run(mut self) -> anyhow::Result<()> {
let start = Utc::now();
let mut now = start;

// Just for fun, don't start at zero.
let mut sequence = start.minute();

loop {
let segment = self
.track
.create_group(serve::Group {
id: sequence as u64,
send_order: 0,
})
.context("failed to create minute segment")?;

sequence += 1;

tokio::spawn(async move {
if let Err(err) = Self::send_segment(segment, now).await {
log::warn!("failed to send minute: {:?}", err);
}
});

let next = now + chrono::Duration::try_minutes(1).unwrap();
let next = next.with_second(0).unwrap().with_nanosecond(0).unwrap();

let delay = (next - now).to_std().unwrap();
tokio::time::sleep(delay).await;

now = next; // just assume we didn't undersleep
}
}

async fn send_segment(mut segment: serve::GroupPublisher, mut now: DateTime<Utc>) -> anyhow::Result<()> {
// Everything but the second.
let base = now.format("%Y-%m-%d %H:%M:").to_string();

segment
.write_object(base.clone().into())
.context("failed to write base")?;

loop {
let delta = now.format("%S").to_string();
segment
.write_object(delta.clone().into())
.context("failed to write delta")?;

println!("{}{}", base, delta);

let next = now + chrono::Duration::try_seconds(1).unwrap();
let next = next.with_nanosecond(0).unwrap();

let delay = (next - now).to_std().unwrap();
tokio::time::sleep(delay).await;

// Get the current time again to check if we overslept
let next = Utc::now();
if next.minute() != now.minute() {
return Ok(());
}

now = next;
}
}
}
pub struct Subscriber {
track: serve::TrackSubscriber,
}

impl Subscriber {
pub fn new(track: serve::TrackSubscriber) -> Self {
Self { track }
}

pub async fn run(mut self) -> anyhow::Result<()> {
while let Some(stream) = self.track.next().await.context("failed to get stream")? {
match stream {
serve::TrackMode::Group(group) => tokio::spawn(async move {
if let Err(err) = Self::recv_group(group).await {
log::warn!("failed to receive group: {:?}", err);
}
}),
serve::TrackMode::Object(object) => tokio::spawn(async move {
if let Err(err) = Self::recv_object(object).await {
log::warn!("failed to receive group: {:?}", err);
}
}),
serve::TrackMode::Stream(stream) => tokio::spawn(async move {
if let Err(err) = Self::recv_track(stream).await {
log::warn!("failed to receive stream: {:?}", err);
}
}),
serve::TrackMode::Datagram(datagram) => tokio::spawn(async move {
if let Err(err) = Self::recv_datagram(datagram) {
log::warn!("failed to receive datagram: {:?}", err);
}
}),
};
}

Ok(())
}

async fn recv_track(mut track: serve::StreamSubscriber) -> anyhow::Result<()> {
while let Some(fragment) = track.next().await? {
let str = String::from_utf8_lossy(&fragment.payload);
println!("{}", str);
}

Ok(())
}

async fn recv_group(mut segment: serve::GroupSubscriber) -> anyhow::Result<()> {
let mut first = segment
.next()
.await
.context("failed to get first fragment")?
.context("no fragments in segment")?;

let base = first.read_all().await?;
let base = String::from_utf8_lossy(&base);

while let Some(mut fragment) = segment.next().await? {
let value = fragment.read_all().await.context("failed to read fragment")?;
let str = String::from_utf8_lossy(&value);

println!("{}{}", base, str);
}

Ok(())
}

async fn recv_object(mut object: serve::ObjectSubscriber) -> anyhow::Result<()> {
let value = object.read_all().await.context("failed to read object")?;
let str = String::from_utf8_lossy(&value);

println!("{}", str);
Ok(())
}

fn recv_datagram(datagram: serve::Datagram) -> anyhow::Result<()> {
let str = String::from_utf8_lossy(&datagram.payload);
println!("{}", str);
Ok(())
}
}
Loading

0 comments on commit f91a8e0

Please sign in to comment.