diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index 306e6e2d4..ebcdae31e 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -28,7 +28,7 @@ keywords = [ default = ["env-filter"] parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"] env-filter = ["tracing-subscriber/env-filter"] -web = ["tonic-web"] +grpc-web = ["tonic-web"] [dependencies] @@ -67,5 +67,5 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [[example]] -name = "app" -required-features = ["web"] +name = "grpc_web" +required-features = ["grpc-web"] diff --git a/console-subscriber/examples/grpc_web.rs b/console-subscriber/examples/grpc_web.rs new file mode 100644 index 000000000..021033b42 --- /dev/null +++ b/console-subscriber/examples/grpc_web.rs @@ -0,0 +1,165 @@ +//! Example of using the console subscriber with tonic-web. +//! This example requires the `grpc-web` feature to be enabled. +//! Run with: +//! ```sh +//! cargo run --example grpc_web --features grpc-web +//! ``` +use std::time::Duration; + +use console_subscriber::ConsoleLayer; + +static HELP: &str = r#" +Example console-instrumented app + +USAGE: + app [OPTIONS] + +OPTIONS: + -h, help prints this message + blocks Includes a (misbehaving) blocking task + burn Includes a (misbehaving) task that spins CPU with self-wakes + coma Includes a (misbehaving) task that forgets to register a waker + noyield Includes a (misbehaving) task that spawns tasks that never yield +"#; + +#[tokio::main] +async fn main() -> Result<(), Box> { + ConsoleLayer::builder() + .with_default_env() + .enable_grpc_web(true) + .init(); + // spawn optional extras from CLI args + // skip first which is command name + for opt in std::env::args().skip(1) { + match &*opt { + "blocks" => { + tokio::task::Builder::new() + .name("blocks") + .spawn(double_sleepy(1, 10)) + .unwrap(); + } + "coma" => { + tokio::task::Builder::new() + .name("coma") + .spawn(std::future::pending::<()>()) + .unwrap(); + } + "burn" => { + tokio::task::Builder::new() + .name("burn") + .spawn(burn(1, 10)) + .unwrap(); + } + "noyield" => { + tokio::task::Builder::new() + .name("noyield") + .spawn(no_yield(20)) + .unwrap(); + } + "blocking" => { + tokio::task::Builder::new() + .name("spawns_blocking") + .spawn(spawn_blocking(5)) + .unwrap(); + } + "help" | "-h" => { + eprintln!("{}", HELP); + return Ok(()); + } + wat => { + return Err( + format!("unknown option: {:?}, run with '-h' to see options", wat).into(), + ) + } + } + } + + let task1 = tokio::task::Builder::new() + .name("task1") + .spawn(spawn_tasks(1, 10)) + .unwrap(); + let task2 = tokio::task::Builder::new() + .name("task2") + .spawn(spawn_tasks(10, 30)) + .unwrap(); + + let result = tokio::try_join! { + task1, + task2, + }; + result?; + + Ok(()) +} + +#[tracing::instrument] +async fn spawn_tasks(min: u64, max: u64) { + loop { + for i in min..max { + tracing::trace!(i, "spawning wait task"); + tokio::task::Builder::new() + .name("wait") + .spawn(wait(i)) + .unwrap(); + + let sleep = Duration::from_secs(max) - Duration::from_secs(i); + tracing::trace!(?sleep, "sleeping..."); + tokio::time::sleep(sleep).await; + } + } +} + +#[tracing::instrument] +async fn wait(seconds: u64) { + tracing::debug!("waiting..."); + tokio::time::sleep(Duration::from_secs(seconds)).await; + tracing::trace!("done!"); +} + +#[tracing::instrument] +async fn double_sleepy(min: u64, max: u64) { + loop { + for i in min..max { + // woops! + std::thread::sleep(Duration::from_secs(i)); + tokio::time::sleep(Duration::from_secs(max - i)).await; + } + } +} + +#[tracing::instrument] +async fn burn(min: u64, max: u64) { + loop { + for i in min..max { + for _ in 0..i { + tokio::task::yield_now().await; + } + tokio::time::sleep(Duration::from_secs(i - min)).await; + } + } +} + +#[tracing::instrument] +async fn no_yield(seconds: u64) { + loop { + let handle = tokio::task::Builder::new() + .name("greedy") + .spawn(async move { + std::thread::sleep(Duration::from_secs(seconds)); + }) + .expect("Couldn't spawn greedy task"); + + _ = handle.await; + } +} + +#[tracing::instrument] +async fn spawn_blocking(seconds: u64) { + loop { + let seconds = seconds; + _ = tokio::task::spawn_blocking(move || { + std::thread::sleep(Duration::from_secs(seconds)); + }) + .await; + } +} diff --git a/console-subscriber/src/builder.rs b/console-subscriber/src/builder.rs index 58c5d9535..67d391d84 100644 --- a/console-subscriber/src/builder.rs +++ b/console-subscriber/src/builder.rs @@ -56,6 +56,10 @@ pub struct Builder { /// Any scheduled times exceeding this duration will be clamped to this /// value. Higher values will result in more memory usage. pub(super) scheduled_duration_max: Duration, + + #[cfg(feature = "grpc-web")] + /// Whether to enable gRPC web support. + enable_grpc_web: bool, } impl Default for Builder { @@ -71,6 +75,8 @@ impl Default for Builder { recording_path: None, filter_env_var: "RUST_LOG".to_string(), self_trace: false, + #[cfg(feature = "grpc-web")] + enable_grpc_web: false, } } } @@ -268,6 +274,17 @@ impl Builder { Self { self_trace, ..self } } + #[cfg(feature = "grpc-web")] + /// Sets whether to enable gRPC web support. + /// + /// By default, gRPC web support is disabled. + pub fn enable_grpc_web(self, enable_grpc_web: bool) -> Self { + Self { + enable_grpc_web, + ..self + } + } + /// Completes the builder, returning a [`ConsoleLayer`] and [`Server`] task. pub fn build(self) -> (ConsoleLayer, Server) { ConsoleLayer::build(self) @@ -481,6 +498,7 @@ impl Builder { } let self_trace = self.self_trace; + let enable_grpc_web = self.enable_grpc_web; let (layer, server) = self.build(); let filter = @@ -501,8 +519,21 @@ impl Builder { .enable_time() .build() .expect("console subscriber runtime initialization failed"); - runtime.block_on(async move { + #[cfg(feature = "grpc-web")] + if enable_grpc_web { + server + .serve_with_grpc_web() + .await + .expect("console subscriber server failed") + } else { + server + .serve() + .await + .expect("console subscriber server failed") + } + + #[cfg(not(feature = "grpc-web"))] server .serve() .await diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index fcd9f8457..31b11447e 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -925,6 +925,38 @@ impl Server { self.serve_with(tonic::transport::Server::builder()).await } + #[cfg(feature = "grpc-web")] + /// Starts the gRPC service with the default gRPC settings and gRPC-Web + /// support. + pub async fn serve_with_grpc_web( + self, + ) -> Result<(), Box> { + let builder = tonic::transport::Server::builder(); + let addr = self.addr.clone(); + let ServerParts { + instrument_server, + aggregator, + } = self.into_parts(); + let aggregate = spawn_named(aggregator.run(), "console::aggregate"); + let router = builder + .accept_http1(true) + .add_service(tonic_web::enable(instrument_server)); + let res = match addr { + ServerAddr::Tcp(addr) => { + let serve = router.serve(addr); + spawn_named(serve, "console::serve").await + } + #[cfg(unix)] + ServerAddr::Unix(path) => { + let incoming = UnixListener::bind(path)?; + let serve = router.serve_with_incoming(UnixListenerStream::new(incoming)); + spawn_named(serve, "console::serve").await + } + }; + aggregate.abort(); + res?.map_err(Into::into) + } + /// Starts the gRPC service with the given [`tonic`] gRPC transport server /// `builder`. /// @@ -937,8 +969,7 @@ impl Server { /// [`tonic`]: https://docs.rs/tonic/ pub async fn serve_with( self, - #[cfg(feature = "web")] builder: tonic::transport::Server, - #[cfg(not(feature = "web"))] mut builder: tonic::transport::Server, + mut builder: tonic::transport::Server, ) -> Result<(), Box> { let addr = self.addr.clone(); let ServerParts { @@ -946,11 +977,6 @@ impl Server { aggregator, } = self.into_parts(); let aggregate = spawn_named(aggregator.run(), "console::aggregate"); - #[cfg(feature = "web")] - let router = builder - .accept_http1(true) - .add_service(tonic_web::enable(instrument_server)); - #[cfg(not(feature = "web"))] let router = builder.add_service(instrument_server); let res = match addr { ServerAddr::Tcp(addr) => {