diff --git a/Cargo.lock b/Cargo.lock index 849f964ec..434988939 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4504,6 +4504,7 @@ dependencies = [ "clap", "futures", "humantime", + "jemallocator", "sentry", "serde", "serde_json", diff --git a/crates/symbolicator-service/src/metrics.rs b/crates/symbolicator-service/src/metrics.rs index 0283f5db3..70e651db5 100644 --- a/crates/symbolicator-service/src/metrics.rs +++ b/crates/symbolicator-service/src/metrics.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use std::net::ToSocketAddrs; use std::sync::OnceLock; -use cadence::{StatsdClient, UdpMetricSink}; +use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient}; static METRICS_CLIENT: OnceLock = OnceLock::new(); @@ -20,8 +20,9 @@ pub fn configure_statsd(prefix: &str, host: A, tags: BTreeMap< } let socket = std::net::UdpSocket::bind("0.0.0.0:0").unwrap(); socket.set_nonblocking(true).unwrap(); - let sink = UdpMetricSink::from(&addrs[..], socket).unwrap(); - let mut builder = StatsdClient::builder(prefix, sink); + let udp_sink = BufferedUdpMetricSink::from(&addrs[..], socket).unwrap(); + let queuing_sink = QueuingMetricSink::from(udp_sink); + let mut builder = StatsdClient::builder(prefix, queuing_sink); for (key, value) in tags { builder = builder.with_tag(key, value) } diff --git a/crates/symbolicator-stress/Cargo.toml b/crates/symbolicator-stress/Cargo.toml index e1cf12222..72475e14e 100644 --- a/crates/symbolicator-stress/Cargo.toml +++ b/crates/symbolicator-stress/Cargo.toml @@ -21,3 +21,6 @@ symbolicator-test = { path = "../symbolicator-test" } tempfile = "3.2.0" tokio = { version = "1.24.2", features = ["rt-multi-thread", "macros", "time", "sync"] } tracing-subscriber = "0.3.17" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +jemallocator = { version = "0.5", features = ["unprefixed_malloc_on_supported_platforms"] } diff --git a/crates/symbolicator-stress/src/logging.rs b/crates/symbolicator-stress/src/logging.rs index 71e240586..2a50a2a1c 100644 --- a/crates/symbolicator-stress/src/logging.rs +++ b/crates/symbolicator-stress/src/logging.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use std::env; use std::future::Future; use std::io::Write; -use std::net::{SocketAddr, TcpListener}; +use std::net::{SocketAddr, TcpListener, UdpSocket}; use std::pin::Pin; use symbolicator_service::metrics; @@ -21,8 +21,8 @@ pub struct Config { #[derive(Default)] pub struct Guard { sentry: Option, - pub sentry_server: Option + Send>>>, - // TODO: return the ports / futures of the http / upd servers to use + pub http_sink: Option + Send>>>, + pub udp_sink: Option + Send>>>, } pub fn init(config: Config) -> Guard { @@ -38,7 +38,7 @@ pub fn init(config: Config) -> Guard { listener.set_nonblocking(true).unwrap(); let socket = listener.local_addr().unwrap(); - guard.sentry_server = Some(Box::pin(async move { + guard.http_sink = Some(Box::pin(async move { async fn ok() -> &'static str { "OK" } @@ -86,7 +86,21 @@ pub fn init(config: Config) -> Guard { } if config.metrics { - let host = "TODO"; // create a *real* noop udp server to send metrics to + let addr = SocketAddr::from(([127, 0, 0, 1], 0)); + let listener = UdpSocket::bind(addr).unwrap(); + listener.set_nonblocking(true).unwrap(); + let socket = listener.local_addr().unwrap(); + + guard.udp_sink = Some(Box::pin(async move { + let listener = tokio::net::UdpSocket::from_std(listener).unwrap(); + let mut buf = Vec::with_capacity(1024); + loop { + buf.clear(); + let _len = listener.recv_buf(&mut buf).await.unwrap(); + } + })); + + let host = format!("127.0.0.1:{}", socket.port()); // have some default tags, just to be closer to the real world config let mut tags = BTreeMap::new(); diff --git a/crates/symbolicator-stress/src/main.rs b/crates/symbolicator-stress/src/main.rs index 3bafdca9e..6280a9cfb 100644 --- a/crates/symbolicator-stress/src/main.rs +++ b/crates/symbolicator-stress/src/main.rs @@ -14,6 +14,13 @@ mod workloads; use stresstest::perform_stresstest; use workloads::WorkloadsConfig; +#[cfg(not(target_env = "msvc"))] +use jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + /// Command line interface parser. #[derive(Parser)] struct Cli { @@ -47,8 +54,7 @@ fn main() -> Result<()> { backtraces: true, tracing: true, sentry: true, - // TODO: init metrics - ..Default::default() + metrics: true, }); let megs = 1024 * 1024; @@ -57,8 +63,11 @@ fn main() -> Result<()> { .thread_stack_size(8 * megs) .build()?; - if let Some(sentry_server) = logging_guard.sentry_server.take() { - runtime.spawn(sentry_server); + if let Some(http_sink) = logging_guard.http_sink.take() { + runtime.spawn(http_sink); + } + if let Some(udp) = logging_guard.udp_sink.take() { + runtime.spawn(udp); } runtime.block_on(perform_stresstest(service_config, workloads, cli.duration))?; diff --git a/crates/symbolicator-stress/src/stresstest.rs b/crates/symbolicator-stress/src/stresstest.rs index ea4e35083..d04189042 100644 --- a/crates/symbolicator-stress/src/stresstest.rs +++ b/crates/symbolicator-stress/src/stresstest.rs @@ -126,7 +126,7 @@ pub async fn perform_stresstest( let (concurrency, ops) = task.unwrap(); let ops_ps = ops as f32 / duration.as_secs() as f32; - println!("Workload {i} (concurrency: {concurrency}): {ops} operations, {ops_ps} ops/s"); + println!("Workload {i} (concurrency: {concurrency}): {ops} operations, {ops_ps:.2} ops/s"); } Ok(())