Skip to content

Commit

Permalink
Revert "Revert "Revert "Revert "feat: Experiment using statsdproxy fo…
Browse files Browse the repository at this point in the history
…r aggregation (#… (#5064)""

This reverts commit b291670.
  • Loading branch information
untitaker committed Jan 2, 2024
1 parent b291670 commit 5540255
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 8 deletions.
71 changes: 71 additions & 0 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
thiserror = "1.0"
tokio = { version = "1.19.2", features = ["full"] }
statsdproxy = { version = "0.1.0", features = ["cadence-adapter"], git = "https://github.com/getsentry/statsdproxy", default-features = false }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
uuid = "1.5.0"
Expand Down
26 changes: 18 additions & 8 deletions rust_snuba/src/metrics/statsd.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use cadence::prelude::*;
use cadence::{BufferedUdpMetricSink, MetricBuilder, MetricError, QueuingMetricSink, StatsdClient};
use cadence::{MetricBuilder, MetricError, StatsdClient};
use rust_arroyo::utils::metrics::Metrics as ArroyoMetrics;
use statsdproxy::cadence::StatsdProxyMetricSink;
use statsdproxy::config::AggregateMetricsConfig;
use statsdproxy::middleware::aggregate::AggregateMetrics;
use statsdproxy::middleware::Upstream;
use std::collections::HashMap;
use std::net::UdpSocket;

#[derive(Debug)]
pub struct StatsDBackend {
Expand All @@ -11,13 +14,20 @@ pub struct StatsDBackend {

impl StatsDBackend {
pub fn new(host: &str, port: u16, prefix: &str, global_tags: HashMap<&str, &str>) -> Self {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_nonblocking(true).unwrap();
let sink_addr = (host, port);
let buffered = BufferedUdpMetricSink::from(sink_addr, socket).unwrap();
let queuing_sink = QueuingMetricSink::from(buffered);
let upstream_addr = format!("{}:{}", host, port);
let aggregator_sink = StatsdProxyMetricSink::new(move || {
let upstream = Upstream::new(upstream_addr.clone()).unwrap();
let config = AggregateMetricsConfig {
aggregate_counters: true,
flush_offset: 0,
flush_interval: 1,
aggregate_gauges: true,
max_map_size: None,
};
AggregateMetrics::new(config, upstream)
});

let mut client_builder = StatsdClient::builder(prefix, queuing_sink);
let mut client_builder = StatsdClient::builder(prefix, aggregator_sink);
for (k, v) in global_tags {
client_builder = client_builder.with_tag(k, v);
}
Expand Down

0 comments on commit 5540255

Please sign in to comment.