Skip to content

Commit

Permalink
feat: add new API to support grpc-web
Browse files Browse the repository at this point in the history
Signed-off-by: hi-rustin <rustin.liu@gmail.com>
  • Loading branch information
Rustin170506 committed Dec 3, 2023
1 parent 96c65bd commit 545750c
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 2 deletions.
45 changes: 45 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ keywords = [
default = ["env-filter"]
parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"]
env-filter = ["tracing-subscriber/env-filter"]
grpc-web = ["tonic-web"]

[dependencies]

Expand All @@ -53,6 +54,9 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
crossbeam-channel = "0.5"

# Only for the web feature:
tonic-web = { version = "0.10.2", optional = true }

[dev-dependencies]
tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] }
tower = { version = "0.4", default-features = false }
Expand All @@ -61,3 +65,7 @@ futures = "0.3"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[[example]]
name = "grpc_web"
required-features = ["grpc-web"]
165 changes: 165 additions & 0 deletions console-subscriber/examples/grpc_web.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
//! Example of using the console subscriber with tonic-web.
//! This example requires the `grpc-web` feature to be enabled.
//! Run with:
//! ```sh
//! cargo run --example grpc_web --features grpc-web
//! ```
use std::time::Duration;

use console_subscriber::ConsoleLayer;

static HELP: &str = r#"
Example console-instrumented app
USAGE:
app [OPTIONS]
OPTIONS:
-h, help prints this message
blocks Includes a (misbehaving) blocking task
burn Includes a (misbehaving) task that spins CPU with self-wakes
coma Includes a (misbehaving) task that forgets to register a waker
noyield Includes a (misbehaving) task that spawns tasks that never yield
"#;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
ConsoleLayer::builder()
.with_default_env()
.enable_grpc_web(true)
.init();
// spawn optional extras from CLI args
// skip first which is command name
for opt in std::env::args().skip(1) {
match &*opt {
"blocks" => {
tokio::task::Builder::new()
.name("blocks")
.spawn(double_sleepy(1, 10))
.unwrap();
}
"coma" => {
tokio::task::Builder::new()
.name("coma")
.spawn(std::future::pending::<()>())
.unwrap();
}
"burn" => {
tokio::task::Builder::new()
.name("burn")
.spawn(burn(1, 10))
.unwrap();
}
"noyield" => {
tokio::task::Builder::new()
.name("noyield")
.spawn(no_yield(20))
.unwrap();
}
"blocking" => {
tokio::task::Builder::new()
.name("spawns_blocking")
.spawn(spawn_blocking(5))
.unwrap();
}
"help" | "-h" => {
eprintln!("{}", HELP);
return Ok(());
}
wat => {
return Err(
format!("unknown option: {:?}, run with '-h' to see options", wat).into(),
)
}
}
}

let task1 = tokio::task::Builder::new()
.name("task1")
.spawn(spawn_tasks(1, 10))
.unwrap();
let task2 = tokio::task::Builder::new()
.name("task2")
.spawn(spawn_tasks(10, 30))
.unwrap();

let result = tokio::try_join! {
task1,
task2,
};
result?;

Ok(())
}

#[tracing::instrument]
async fn spawn_tasks(min: u64, max: u64) {
loop {
for i in min..max {
tracing::trace!(i, "spawning wait task");
tokio::task::Builder::new()
.name("wait")
.spawn(wait(i))
.unwrap();

let sleep = Duration::from_secs(max) - Duration::from_secs(i);
tracing::trace!(?sleep, "sleeping...");
tokio::time::sleep(sleep).await;
}
}
}

#[tracing::instrument]
async fn wait(seconds: u64) {
tracing::debug!("waiting...");
tokio::time::sleep(Duration::from_secs(seconds)).await;
tracing::trace!("done!");
}

#[tracing::instrument]
async fn double_sleepy(min: u64, max: u64) {
loop {
for i in min..max {
// woops!
std::thread::sleep(Duration::from_secs(i));
tokio::time::sleep(Duration::from_secs(max - i)).await;
}
}
}

#[tracing::instrument]
async fn burn(min: u64, max: u64) {
loop {
for i in min..max {
for _ in 0..i {
tokio::task::yield_now().await;
}
tokio::time::sleep(Duration::from_secs(i - min)).await;
}
}
}

#[tracing::instrument]
async fn no_yield(seconds: u64) {
loop {
let handle = tokio::task::Builder::new()
.name("greedy")
.spawn(async move {
std::thread::sleep(Duration::from_secs(seconds));
})
.expect("Couldn't spawn greedy task");

_ = handle.await;
}
}

#[tracing::instrument]
async fn spawn_blocking(seconds: u64) {
loop {
let seconds = seconds;
_ = tokio::task::spawn_blocking(move || {
std::thread::sleep(Duration::from_secs(seconds));
})
.await;
}
}
34 changes: 33 additions & 1 deletion console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub struct Builder {
/// Any scheduled times exceeding this duration will be clamped to this
/// value. Higher values will result in more memory usage.
pub(super) scheduled_duration_max: Duration,

#[cfg(feature = "grpc-web")]
/// Whether to enable gRPC web support.
enable_grpc_web: bool,
}

impl Default for Builder {
Expand All @@ -71,6 +75,8 @@ impl Default for Builder {
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
self_trace: false,
#[cfg(feature = "grpc-web")]
enable_grpc_web: false,
}
}
}
Expand Down Expand Up @@ -268,6 +274,17 @@ impl Builder {
Self { self_trace, ..self }
}

#[cfg(feature = "grpc-web")]
/// Sets whether to enable gRPC web support.
///
/// By default, gRPC web support is disabled.
pub fn enable_grpc_web(self, enable_grpc_web: bool) -> Self {
Self {
enable_grpc_web,
..self
}
}

/// Completes the builder, returning a [`ConsoleLayer`] and [`Server`] task.
pub fn build(self) -> (ConsoleLayer, Server) {
ConsoleLayer::build(self)
Expand Down Expand Up @@ -481,6 +498,8 @@ impl Builder {
}

let self_trace = self.self_trace;
#[cfg(feature = "grpc-web")]
let enable_grpc_web = self.enable_grpc_web;

let (layer, server) = self.build();
let filter =
Expand All @@ -501,8 +520,21 @@ impl Builder {
.enable_time()
.build()
.expect("console subscriber runtime initialization failed");

runtime.block_on(async move {
#[cfg(feature = "grpc-web")]
if enable_grpc_web {
server
.serve_with_grpc_web()
.await
.expect("console subscriber server failed")
} else {
server
.serve()
.await
.expect("console subscriber server failed")
}

#[cfg(not(feature = "grpc-web"))]
server
.serve()
.await
Expand Down
34 changes: 33 additions & 1 deletion console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,39 @@ impl Server {
/// ```
/// [`serve_with`]: Server::serve_with
pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
self.serve_with(tonic::transport::Server::default()).await
self.serve_with(tonic::transport::Server::builder()).await
}

#[cfg(feature = "grpc-web")]
/// Starts the gRPC service with the default gRPC settings and gRPC-Web
/// support.
pub async fn serve_with_grpc_web(
self,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let builder = tonic::transport::Server::builder();
let addr = self.addr.clone();
let ServerParts {
instrument_server,
aggregator,
} = self.into_parts();
let aggregate = spawn_named(aggregator.run(), "console::aggregate");
let router = builder
.accept_http1(true)
.add_service(tonic_web::enable(instrument_server));
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
spawn_named(serve, "console::serve").await
}
#[cfg(unix)]
ServerAddr::Unix(path) => {
let incoming = UnixListener::bind(path)?;
let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
spawn_named(serve, "console::serve").await
}
};
aggregate.abort();
res?.map_err(Into::into)
}

/// Starts the gRPC service with the given [`tonic`] gRPC transport server
Expand Down

0 comments on commit 545750c

Please sign in to comment.