From 5540255b480bbdbf21753606ab025341e01d9f87 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Jan 2024 16:50:52 +0100 Subject: [PATCH] =?UTF-8?q?Revert=20"Revert=20"Revert=20"Revert=20"feat:?= =?UTF-8?q?=20Experiment=20using=20statsdproxy=20for=20aggregation=20(#?= =?UTF-8?q?=E2=80=A6=20(#5064)""?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit b291670df473e5dbc30a31d6a2aab8cbf769aa2d. --- rust_snuba/Cargo.lock | 71 ++++++++++++++++++++++++++++++++ rust_snuba/Cargo.toml | 1 + rust_snuba/src/metrics/statsd.rs | 26 ++++++++---- 3 files changed, 90 insertions(+), 8 deletions(-) diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 0240d372c3..d5f01766c3 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -329,6 +329,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -425,6 +434,19 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "env_logger" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -741,6 +763,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.27" @@ -861,6 +889,17 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is-terminal" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" +dependencies = [ + "hermit-abi", + "rustix", + "windows-sys 0.48.0", +] + [[package]] name = "iso8601" version = "0.6.1" @@ -1742,6 +1781,7 @@ dependencies = [ "sentry-kafka-schemas", "serde", "serde_json", + "statsdproxy", "thiserror", "tokio", "tracing", @@ -2120,6 +2160,19 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "statsdproxy" +version = "0.1.0" +source = "git+https://github.com/getsentry/statsdproxy#ccdd4ac90f1a3f24c75666a1313a5d006089fd88" +dependencies = [ + "anyhow", + "cadence", + "crc32fast", + "env_logger", + "log", + "thread_local", +] + [[package]] name = "strsim" version = "0.10.0" @@ -2188,6 +2241,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "termcolor" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff1bc3d3f05aff0403e8ac0d92ced918ec05b666a43f83297ccef5bea8a3d449" +dependencies = [ + "winapi-util", +] + [[package]] name = "terminal_size" version = "0.3.0" @@ -2699,6 +2761,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 8bb33b6f36..02fc08ecc1 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -40,6 +40,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } thiserror = "1.0" tokio = { version = "1.19.2", features = ["full"] } +statsdproxy = { version = "0.1.0", features = ["cadence-adapter"], git = "https://github.com/getsentry/statsdproxy", default-features = false } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } uuid = "1.5.0" diff --git a/rust_snuba/src/metrics/statsd.rs b/rust_snuba/src/metrics/statsd.rs index b93e68bdf3..6efd166d46 100644 --- a/rust_snuba/src/metrics/statsd.rs +++ b/rust_snuba/src/metrics/statsd.rs @@ -1,8 +1,11 @@ use cadence::prelude::*; -use cadence::{BufferedUdpMetricSink, MetricBuilder, MetricError, QueuingMetricSink, StatsdClient}; +use cadence::{MetricBuilder, MetricError, StatsdClient}; use rust_arroyo::utils::metrics::Metrics as ArroyoMetrics; +use statsdproxy::cadence::StatsdProxyMetricSink; +use statsdproxy::config::AggregateMetricsConfig; +use statsdproxy::middleware::aggregate::AggregateMetrics; +use statsdproxy::middleware::Upstream; use std::collections::HashMap; -use std::net::UdpSocket; #[derive(Debug)] pub struct StatsDBackend { @@ -11,13 +14,20 @@ pub struct StatsDBackend { impl StatsDBackend { pub fn new(host: &str, port: u16, prefix: &str, global_tags: HashMap<&str, &str>) -> Self { - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket.set_nonblocking(true).unwrap(); - let sink_addr = (host, port); - let buffered = BufferedUdpMetricSink::from(sink_addr, socket).unwrap(); - let queuing_sink = QueuingMetricSink::from(buffered); + let upstream_addr = format!("{}:{}", host, port); + let aggregator_sink = StatsdProxyMetricSink::new(move || { + let upstream = Upstream::new(upstream_addr.clone()).unwrap(); + let config = AggregateMetricsConfig { + aggregate_counters: true, + flush_offset: 0, + flush_interval: 1, + aggregate_gauges: true, + max_map_size: None, + }; + AggregateMetrics::new(config, upstream) + }); - let mut client_builder = StatsdClient::builder(prefix, queuing_sink); + let mut client_builder = StatsdClient::builder(prefix, aggregator_sink); for (k, v) in global_tags { client_builder = client_builder.with_tag(k, v); }