From 41502531396106b551a9dde2d3a83ddb0beac774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Fri, 16 Feb 2024 21:33:40 +0800 Subject: [PATCH] feat(subscriber): support grpc-web and add `grpc-web` feature (#498) ## Description This pull request adds support for `grpc-web` to `console-subscriber`. Once you enable this feature by calling the `enable_grpc_web` function, you can connect the console-subscriber gRPC server using a browser client. ## Explanation of Changes 1. Added a new feature called `grpc-web` which requires the `tonic-web` crate as a dependency. 2. A new API named `serve_with_grpc_web` has been introduced. It appears to be similar to the `serve_with` API. However, if we were to use the same API with `serve_with`, it would result in a bound issue. We attempted to combine `serve_with_grpc_web` and `serve_with`, but it would create a very complex trait bound for the function. Therefore, we decided to introduce a new API to address this problem. 4. Added a new example named `grpc_web` to show how to use the `into_parts` API to customize the CORS layer. Ref #497 Signed-off-by: hi-rustin Co-authored-by: Hayden Stainsby --- Cargo.lock | 47 +++++++++ console-subscriber/Cargo.toml | 11 +- console-subscriber/examples/grpc_web.rs | 122 +++++++++++++++++++++++ console-subscriber/src/builder.rs | 40 +++++++- console-subscriber/src/lib.rs | 127 ++++++++++++++++++++++++ 5 files changed, 345 insertions(+), 2 deletions(-) create mode 100644 console-subscriber/examples/grpc_web.rs diff --git a/Cargo.lock b/Cargo.lock index 0b1341c7d..a92522293 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -327,6 +327,7 @@ dependencies = [ "futures", "futures-task", "hdrhistogram", + "http", "humantime", "parking_lot", "prost", @@ -337,7 +338,9 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tonic-web", "tower", + "tower-http", "tracing", "tracing-core", "tracing-subscriber", @@ -755,6 +758,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + [[package]] name = "httparse" version = "1.8.0" @@ -1879,6 +1888,26 @@ dependencies = [ "syn 2.0.33", ] +[[package]] +name = "tonic-web" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fddb2a37b247e6adcb9f239f4e5cefdcc5ed526141a416b943929f13aea2cce" +dependencies = [ + "base64 0.21.0", + "bytes", + "http", + "http-body", + "hyper", + "pin-project", + "tokio-stream", + "tonic", + "tower-http", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -1899,6 +1928,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "bitflags 2.4.0", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.2" diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index 7bdf8c8ef..6e7e48cd4 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -28,9 +28,9 @@ keywords = [ default = ["env-filter"] parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"] env-filter = ["tracing-subscriber/env-filter"] +grpc-web = ["tonic-web"] [dependencies] - crossbeam-utils = "0.8.7" tokio = { version = "^1.21", features = ["sync", "time", "macros", "tracing"] } tokio-stream = { version = "0.1", features = ["net"] } @@ -54,11 +54,20 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" crossbeam-channel = "0.5" +# Only for the web feature: +tonic-web = { version = "0.10.2", optional = true } + [dev-dependencies] tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] } tower = { version = "0.4", default-features = false } futures = "0.3" +http = "0.2" +tower-http = { version = "0.4", features = ["cors"] } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] + +[[example]] +name = "grpc_web" +required-features = ["grpc-web"] diff --git a/console-subscriber/examples/grpc_web.rs b/console-subscriber/examples/grpc_web.rs new file mode 100644 index 000000000..db2820d47 --- /dev/null +++ b/console-subscriber/examples/grpc_web.rs @@ -0,0 +1,122 @@ +//! Example of using the console subscriber with tonic-web. +//! This example requires the `grpc-web` feature to be enabled. +//! Run with: +//! ```sh +//! cargo run --example grpc_web --features grpc-web +//! ``` +use std::{thread, time::Duration}; + +use console_subscriber::{ConsoleLayer, ServerParts}; +use http::header::HeaderName; +use tonic_web::GrpcWebLayer; +use tower_http::cors::{AllowOrigin, CorsLayer}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60); +const DEFAULT_EXPOSED_HEADERS: [&str; 3] = + ["grpc-status", "grpc-message", "grpc-status-details-bin"]; +const DEFAULT_ALLOW_HEADERS: [&str; 5] = [ + "x-grpc-web", + "content-type", + "x-user-agent", + "grpc-timeout", + "user-agent", +]; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let (console_layer, server) = ConsoleLayer::builder().with_default_env().build(); + thread::Builder::new() + .name("subscriber".into()) + .spawn(move || { + // Do not trace anything in this thread. + let _subscriber_guard = + tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default()); + // Custom CORS configuration. + let cors = CorsLayer::new() + .allow_origin(AllowOrigin::mirror_request()) + .allow_credentials(true) + .max_age(DEFAULT_MAX_AGE) + .expose_headers( + DEFAULT_EXPOSED_HEADERS + .iter() + .cloned() + .map(HeaderName::from_static) + .collect::>(), + ) + .allow_headers( + DEFAULT_ALLOW_HEADERS + .iter() + .cloned() + .map(HeaderName::from_static) + .collect::>(), + ); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("console subscriber runtime initialization failed"); + runtime.block_on(async move { + let ServerParts { + instrument_server, + aggregator, + .. + } = server.into_parts(); + tokio::spawn(aggregator.run()); + let router = tonic::transport::Server::builder() + // Accept gRPC-Web requests and enable CORS. + .accept_http1(true) + .layer(cors) + .layer(GrpcWebLayer::new()) + .add_service(instrument_server); + let serve = router.serve(std::net::SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), + // 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead. + 9999, + )); + serve.await.expect("console subscriber server failed"); + }); + }) + .expect("console subscriber could not spawn thread"); + tracing_subscriber::registry().with(console_layer).init(); + + let task1 = tokio::task::Builder::new() + .name("task1") + .spawn(spawn_tasks(1, 10)) + .unwrap(); + let task2 = tokio::task::Builder::new() + .name("task2") + .spawn(spawn_tasks(10, 30)) + .unwrap(); + + let result = tokio::try_join! { + task1, + task2, + }; + result?; + + Ok(()) +} + +#[tracing::instrument] +async fn spawn_tasks(min: u64, max: u64) { + loop { + for i in min..max { + tracing::trace!(i, "spawning wait task"); + tokio::task::Builder::new() + .name("wait") + .spawn(wait(i)) + .unwrap(); + + let sleep = Duration::from_secs(max) - Duration::from_secs(i); + tracing::trace!(?sleep, "sleeping..."); + tokio::time::sleep(sleep).await; + } + } +} + +#[tracing::instrument] +async fn wait(seconds: u64) { + tracing::debug!("waiting..."); + tokio::time::sleep(Duration::from_secs(seconds)).await; + tracing::trace!("done!"); +} diff --git a/console-subscriber/src/builder.rs b/console-subscriber/src/builder.rs index 58c5d9535..85406c8ec 100644 --- a/console-subscriber/src/builder.rs +++ b/console-subscriber/src/builder.rs @@ -56,6 +56,10 @@ pub struct Builder { /// Any scheduled times exceeding this duration will be clamped to this /// value. Higher values will result in more memory usage. pub(super) scheduled_duration_max: Duration, + + /// Whether to enable the grpc-web support. + #[cfg(feature = "grpc-web")] + enable_grpc_web: bool, } impl Default for Builder { @@ -71,6 +75,8 @@ impl Default for Builder { recording_path: None, filter_env_var: "RUST_LOG".to_string(), self_trace: false, + #[cfg(feature = "grpc-web")] + enable_grpc_web: false, } } } @@ -268,6 +274,28 @@ impl Builder { Self { self_trace, ..self } } + /// Sets whether to enable the grpc-web support. + /// + /// By default, this is `false`. If enabled, the console subscriber will + /// serve the gRPC-Web protocol in addition to the standard gRPC protocol. + /// This is useful for serving the console subscriber to web clients. + /// Please be aware that the current default server port is set to 6669. + /// However, certain browsers may restrict this port due to security reasons. + /// If you encounter issues with this, consider changing the port to an + /// alternative one that is not commonly blocked by browsers. + /// + /// [`serve_with_grpc_web`] is used to provide more advanced configuration + /// for the gRPC-Web server. + /// + /// [`serve_with_grpc_web`]: crate::Server::serve_with_grpc_web + #[cfg(feature = "grpc-web")] + pub fn enable_grpc_web(self, enable_grpc_web: bool) -> Self { + Self { + enable_grpc_web, + ..self + } + } + /// Completes the builder, returning a [`ConsoleLayer`] and [`Server`] task. pub fn build(self) -> (ConsoleLayer, Server) { ConsoleLayer::build(self) @@ -481,6 +509,8 @@ impl Builder { } let self_trace = self.self_trace; + #[cfg(feature = "grpc-web")] + let enable_grpc_web = self.enable_grpc_web; let (layer, server) = self.build(); let filter = @@ -501,8 +531,16 @@ impl Builder { .enable_time() .build() .expect("console subscriber runtime initialization failed"); - runtime.block_on(async move { + #[cfg(feature = "grpc-web")] + if enable_grpc_web { + server + .serve_with_grpc_web(tonic::transport::Server::builder()) + .await + .expect("console subscriber server failed"); + return; + } + server .serve() .await diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index fde18af8d..0e45ed6f9 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -962,6 +962,133 @@ impl Server { res?.map_err(Into::into) } + /// Starts the gRPC service with the default gRPC settings and gRPC-Web + /// support. + /// + /// # Examples + /// + /// To serve the instrument server with gRPC-Web support with the default + /// settings: + /// + /// ```rust + /// # async fn docs() -> Result<(), Box> { + /// # let (_, server) = console_subscriber::ConsoleLayer::new(); + /// server.serve_with_grpc_web(tonic::transport::Server::default()).await + /// # } + /// ``` + /// + /// To serve the instrument server with gRPC-Web support and a custom CORS configuration, use the + /// following code: + /// + /// ```rust + /// # use std::{thread, time::Duration}; + /// # + /// use console_subscriber::{ConsoleLayer, ServerParts}; + /// use tonic_web::GrpcWebLayer; + /// use tower_web::cors::{CorsLayer, AllowOrigin}; + /// use http::header::HeaderName; + /// # use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + /// # const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60); + /// # const DEFAULT_EXPOSED_HEADERS: [&str; 3] = + /// # ["grpc-status", "grpc-message", "grpc-status-details-bin"]; + /// # const DEFAULT_ALLOW_HEADERS: [&str; 5] = [ + /// # "x-grpc-web", + /// # "content-type", + /// # "x-user-agent", + /// # "grpc-timeout", + /// # "user-agent", + /// # ]; + /// + /// let (console_layer, server) = ConsoleLayer::builder().with_default_env().build(); + /// # thread::Builder::new() + /// # .name("subscriber".into()) + /// # .spawn(move || { + /// // Customize the CORS configuration. + /// let cors = CorsLayer::new() + /// .allow_origin(AllowOrigin::mirror_request()) + /// .allow_credentials(true) + /// .max_age(DEFAULT_MAX_AGE) + /// .expose_headers( + /// DEFAULT_EXPOSED_HEADERS + /// .iter() + /// .cloned() + /// .map(HeaderName::from_static) + /// .collect::>(), + /// ) + /// .allow_headers( + /// DEFAULT_ALLOW_HEADERS + /// .iter() + /// .cloned() + /// .map(HeaderName::from_static) + /// .collect::>(), + /// ); + /// # let runtime = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .expect("console subscriber runtime initialization failed"); + /// # runtime.block_on(async move { + /// + /// let ServerParts { + /// instrument_server, + /// aggregator, + /// .. + /// } = server.into_parts(); + /// tokio::spawn(aggregator.run()); + /// + /// // Serve the instrument server with gRPC-Web support and the CORS configuration. + /// let router = tonic::transport::Server::builder() + /// .accept_http1(true) + /// .layer(cors) + /// .layer(GrpcWebLayer::new()) + /// .add_service(instrument_server); + /// let serve = router.serve(std::net::SocketAddr::new( + /// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), + /// // 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead. + /// 9999, + /// )); + /// + /// // Finally, spawn the server. + /// serve.await.expect("console subscriber server failed"); + /// # }); + /// # }) + /// # .expect("console subscriber could not spawn thread"); + /// # tracing_subscriber::registry().with(console_layer).init(); + /// ``` + /// + /// For a comprehensive understanding and complete code example, + /// please refer to the `grpc-web` example in the examples directory. + /// + /// [`Router::serve`]: fn@tonic::transport::server::Router::serve + #[cfg(feature = "grpc-web")] + pub async fn serve_with_grpc_web( + self, + builder: tonic::transport::Server, + ) -> Result<(), Box> { + let addr = self.addr.clone(); + let ServerParts { + instrument_server, + aggregator, + } = self.into_parts(); + let router = builder + .accept_http1(true) + .add_service(tonic_web::enable(instrument_server)); + let aggregate = spawn_named(aggregator.run(), "console::aggregate"); + let res = match addr { + ServerAddr::Tcp(addr) => { + let serve = router.serve(addr); + spawn_named(serve, "console::serve").await + } + #[cfg(unix)] + ServerAddr::Unix(path) => { + let incoming = UnixListener::bind(path)?; + let serve = router.serve_with_incoming(UnixListenerStream::new(incoming)); + spawn_named(serve, "console::serve").await + } + }; + aggregate.abort(); + res?.map_err(Into::into) + } + /// Returns the parts needed to spawn a gRPC server and the aggregator that /// supplies it. ///