diff --git a/Cargo.lock b/Cargo.lock index 46290ab97..0385dce8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -336,6 +336,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tonic-web", "tower", "tracing", "tracing-core", @@ -742,6 +743,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + [[package]] name = "httparse" version = "1.8.0" @@ -1856,6 +1863,26 @@ dependencies = [ "syn 2.0.33", ] +[[package]] +name = "tonic-web" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fddb2a37b247e6adcb9f239f4e5cefdcc5ed526141a416b943929f13aea2cce" +dependencies = [ + "base64 0.21.0", + "bytes", + "http", + "http-body", + "hyper", + "pin-project", + "tokio-stream", + "tonic", + "tower-http", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -1876,6 +1903,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "bitflags 2.4.0", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.2" diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index 113a4ceb3..ebcdae31e 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -28,6 +28,7 @@ keywords = [ default = ["env-filter"] parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"] env-filter = ["tracing-subscriber/env-filter"] +grpc-web = ["tonic-web"] [dependencies] @@ -53,6 +54,9 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" crossbeam-channel = "0.5" +# Only for the web feature: +tonic-web = { version = "0.10.2", optional = true } + [dev-dependencies] tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] } tower = { version = "0.4", default-features = false } @@ -61,3 +65,7 @@ futures = "0.3" [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] + +[[example]] +name = "grpc_web" +required-features = ["grpc-web"] diff --git a/console-subscriber/examples/grpc_web.rs b/console-subscriber/examples/grpc_web.rs new file mode 100644 index 000000000..021033b42 --- /dev/null +++ b/console-subscriber/examples/grpc_web.rs @@ -0,0 +1,165 @@ +//! Example of using the console subscriber with tonic-web. +//! This example requires the `grpc-web` feature to be enabled. +//! Run with: +//! ```sh +//! cargo run --example grpc_web --features grpc-web +//! ``` +use std::time::Duration; + +use console_subscriber::ConsoleLayer; + +static HELP: &str = r#" +Example console-instrumented app + +USAGE: + app [OPTIONS] + +OPTIONS: + -h, help prints this message + blocks Includes a (misbehaving) blocking task + burn Includes a (misbehaving) task that spins CPU with self-wakes + coma Includes a (misbehaving) task that forgets to register a waker + noyield Includes a (misbehaving) task that spawns tasks that never yield +"#; + +#[tokio::main] +async fn main() -> Result<(), Box> { + ConsoleLayer::builder() + .with_default_env() + .enable_grpc_web(true) + .init(); + // spawn optional extras from CLI args + // skip first which is command name + for opt in std::env::args().skip(1) { + match &*opt { + "blocks" => { + tokio::task::Builder::new() + .name("blocks") + .spawn(double_sleepy(1, 10)) + .unwrap(); + } + "coma" => { + tokio::task::Builder::new() + .name("coma") + .spawn(std::future::pending::<()>()) + .unwrap(); + } + "burn" => { + tokio::task::Builder::new() + .name("burn") + .spawn(burn(1, 10)) + .unwrap(); + } + "noyield" => { + tokio::task::Builder::new() + .name("noyield") + .spawn(no_yield(20)) + .unwrap(); + } + "blocking" => { + tokio::task::Builder::new() + .name("spawns_blocking") + .spawn(spawn_blocking(5)) + .unwrap(); + } + "help" | "-h" => { + eprintln!("{}", HELP); + return Ok(()); + } + wat => { + return Err( + format!("unknown option: {:?}, run with '-h' to see options", wat).into(), + ) + } + } + } + + let task1 = tokio::task::Builder::new() + .name("task1") + .spawn(spawn_tasks(1, 10)) + .unwrap(); + let task2 = tokio::task::Builder::new() + .name("task2") + .spawn(spawn_tasks(10, 30)) + .unwrap(); + + let result = tokio::try_join! { + task1, + task2, + }; + result?; + + Ok(()) +} + +#[tracing::instrument] +async fn spawn_tasks(min: u64, max: u64) { + loop { + for i in min..max { + tracing::trace!(i, "spawning wait task"); + tokio::task::Builder::new() + .name("wait") + .spawn(wait(i)) + .unwrap(); + + let sleep = Duration::from_secs(max) - Duration::from_secs(i); + tracing::trace!(?sleep, "sleeping..."); + tokio::time::sleep(sleep).await; + } + } +} + +#[tracing::instrument] +async fn wait(seconds: u64) { + tracing::debug!("waiting..."); + tokio::time::sleep(Duration::from_secs(seconds)).await; + tracing::trace!("done!"); +} + +#[tracing::instrument] +async fn double_sleepy(min: u64, max: u64) { + loop { + for i in min..max { + // woops! + std::thread::sleep(Duration::from_secs(i)); + tokio::time::sleep(Duration::from_secs(max - i)).await; + } + } +} + +#[tracing::instrument] +async fn burn(min: u64, max: u64) { + loop { + for i in min..max { + for _ in 0..i { + tokio::task::yield_now().await; + } + tokio::time::sleep(Duration::from_secs(i - min)).await; + } + } +} + +#[tracing::instrument] +async fn no_yield(seconds: u64) { + loop { + let handle = tokio::task::Builder::new() + .name("greedy") + .spawn(async move { + std::thread::sleep(Duration::from_secs(seconds)); + }) + .expect("Couldn't spawn greedy task"); + + _ = handle.await; + } +} + +#[tracing::instrument] +async fn spawn_blocking(seconds: u64) { + loop { + let seconds = seconds; + _ = tokio::task::spawn_blocking(move || { + std::thread::sleep(Duration::from_secs(seconds)); + }) + .await; + } +} diff --git a/console-subscriber/src/builder.rs b/console-subscriber/src/builder.rs index 58c5d9535..358bc4327 100644 --- a/console-subscriber/src/builder.rs +++ b/console-subscriber/src/builder.rs @@ -56,6 +56,10 @@ pub struct Builder { /// Any scheduled times exceeding this duration will be clamped to this /// value. Higher values will result in more memory usage. pub(super) scheduled_duration_max: Duration, + + #[cfg(feature = "grpc-web")] + /// Whether to enable gRPC web support. + enable_grpc_web: bool, } impl Default for Builder { @@ -71,6 +75,8 @@ impl Default for Builder { recording_path: None, filter_env_var: "RUST_LOG".to_string(), self_trace: false, + #[cfg(feature = "grpc-web")] + enable_grpc_web: false, } } } @@ -268,6 +274,17 @@ impl Builder { Self { self_trace, ..self } } + #[cfg(feature = "grpc-web")] + /// Sets whether to enable gRPC web support. + /// + /// By default, gRPC web support is disabled. + pub fn enable_grpc_web(self, enable_grpc_web: bool) -> Self { + Self { + enable_grpc_web, + ..self + } + } + /// Completes the builder, returning a [`ConsoleLayer`] and [`Server`] task. pub fn build(self) -> (ConsoleLayer, Server) { ConsoleLayer::build(self) @@ -481,6 +498,8 @@ impl Builder { } let self_trace = self.self_trace; + #[cfg(feature = "grpc-web")] + let enable_grpc_web = self.enable_grpc_web; let (layer, server) = self.build(); let filter = @@ -501,8 +520,21 @@ impl Builder { .enable_time() .build() .expect("console subscriber runtime initialization failed"); - runtime.block_on(async move { + #[cfg(feature = "grpc-web")] + if enable_grpc_web { + server + .serve_with_grpc_web() + .await + .expect("console subscriber server failed") + } else { + server + .serve() + .await + .expect("console subscriber server failed") + } + + #[cfg(not(feature = "grpc-web"))] server .serve() .await diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index fde18af8d..31b11447e 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -922,7 +922,39 @@ impl Server { /// ``` /// [`serve_with`]: Server::serve_with pub async fn serve(self) -> Result<(), Box> { - self.serve_with(tonic::transport::Server::default()).await + self.serve_with(tonic::transport::Server::builder()).await + } + + #[cfg(feature = "grpc-web")] + /// Starts the gRPC service with the default gRPC settings and gRPC-Web + /// support. + pub async fn serve_with_grpc_web( + self, + ) -> Result<(), Box> { + let builder = tonic::transport::Server::builder(); + let addr = self.addr.clone(); + let ServerParts { + instrument_server, + aggregator, + } = self.into_parts(); + let aggregate = spawn_named(aggregator.run(), "console::aggregate"); + let router = builder + .accept_http1(true) + .add_service(tonic_web::enable(instrument_server)); + let res = match addr { + ServerAddr::Tcp(addr) => { + let serve = router.serve(addr); + spawn_named(serve, "console::serve").await + } + #[cfg(unix)] + ServerAddr::Unix(path) => { + let incoming = UnixListener::bind(path)?; + let serve = router.serve_with_incoming(UnixListenerStream::new(incoming)); + spawn_named(serve, "console::serve").await + } + }; + aggregate.abort(); + res?.map_err(Into::into) } /// Starts the gRPC service with the given [`tonic`] gRPC transport server