diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 3752097c58..baaaae885f 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -302,6 +302,15 @@ dependencies = [ "uuid 1.3.1", ] +[[package]] +name = "dogstatsd" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4609d1443932a4fcee5d2df820ef0597d039fa4154a5c60f798650a5b4f48e14" +dependencies = [ + "chrono", +] + [[package]] name = "encoding_rs" version = "0.8.32" @@ -1355,8 +1364,10 @@ version = "0.1.0" dependencies = [ "anyhow", "ctrlc", + "dogstatsd", "env_logger 0.10.0", "glob", + "lazy_static", "log", "pyo3", "rust_arroyo", diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index e94409e963..781d8c54eb 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -27,3 +27,8 @@ glob = "0.3.1" pyo3 = { version = "0.18.1", features = ["chrono", "extension-module"] } ctrlc = "3.2.5" sentry = "0.31.0" + +# unofficial lib. haven't validated it yet +dogstatsd = "0.8.0" + +lazy_static = "1.4.0" diff --git a/rust_snuba/rust_arroyo/src/utils/metrics.rs b/rust_snuba/rust_arroyo/src/utils/metrics.rs index b92a9acb8c..9aee1197c3 100644 --- a/rust_snuba/rust_arroyo/src/utils/metrics.rs +++ b/rust_snuba/rust_arroyo/src/utils/metrics.rs @@ -8,7 +8,12 @@ use std::collections::HashMap; use std::net::{ToSocketAddrs, UdpSocket}; use std::sync::Arc; -pub trait Metrics { +pub trait MetricsClientTrait: Send + Sync { + + fn should_sample(&self, sample_rate: Option) -> bool { + rand::thread_rng().gen_range(0.0..=1.0) < sample_rate.unwrap_or(1.0) + } + fn counter( &self, key: &str, @@ -39,7 +44,7 @@ pub struct MetricsClient { prefix: String, } -impl Metrics for MetricsClient { +impl MetricsClientTrait for MetricsClient { fn counter( &self, key: &str, @@ -61,7 +66,7 @@ impl Metrics for MetricsClient { match result { Ok(_) => {} Err(_err) => { - println!("Failed to send metric {}: {}", key, _err) + println!("Failed to send metric {key}: {_err}") } } } @@ -86,7 +91,7 @@ impl Metrics for MetricsClient { match result { Ok(_) => {} Err(_err) => { - println!("Failed to send metric {}: {}", key, _err) + println!("Failed to send metric {key}: {_err}") } } } @@ -111,7 +116,7 @@ impl Metrics for MetricsClient { match result { Ok(_) => {} Err(_err) => { - println!("Failed to send metric {}: {}", key, _err) + println!("Failed to send metric {key}: {_err}") } } } @@ -119,9 +124,6 @@ impl Metrics for MetricsClient { } impl MetricsClient { - fn should_sample(&self, sample_rate: Option) -> bool { - rand::thread_rng().gen_range(0.0..=1.0) < sample_rate.unwrap_or(1.0) - } fn send_with_tags<'t, T: cadence::Metric + From>( &self, @@ -135,14 +137,14 @@ impl MetricsClient { match result { Ok(_) => {} Err(_err) => { - println!("Failed to send metric with tags: {}", _err) + println!("Failed to send metric with tags: {_err}") } } } } lazy_static! { - static ref METRICS_CLIENT: RwLock>> = RwLock::new(None); + static ref METRICS_CLIENT: RwLock>> = RwLock::new(None); } const METRICS_MAX_QUEUE_SIZE: usize = 1024; @@ -164,6 +166,9 @@ pub fn init(prefix: &str, host: A) { println!("Emitting metrics with prefix {}", metrics_client.prefix); *METRICS_CLIENT.write() = Some(Arc::new(metrics_client)); } +pub fn configure_metrics(metrics_client: impl MetricsClientTrait + 'static){ + *METRICS_CLIENT.write() = Some(Arc::new(metrics_client)); +} // TODO: Remove cloning METRICS_CLIENT each time this is called using thread local storage. pub fn increment( @@ -211,6 +216,7 @@ mod tests { .clone() .unwrap() .should_sample(Some(0.0)),); + assert!(METRICS_CLIENT .read() .clone() diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index 06306a5a0c..390078057e 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -2,6 +2,7 @@ mod config; mod consumer; mod strategies; mod types; +pub mod utils; use pyo3::prelude::*; diff --git a/rust_snuba/src/utils/metrics/backends/abstract_backend.rs b/rust_snuba/src/utils/metrics/backends/abstract_backend.rs new file mode 100644 index 0000000000..83820f31ad --- /dev/null +++ b/rust_snuba/src/utils/metrics/backends/abstract_backend.rs @@ -0,0 +1,12 @@ +use rust_arroyo::utils::metrics::MetricsClientTrait; + +pub trait MetricsBackend: MetricsClientTrait { + fn events( + &self, + title: &str, + text: &str, + alert_type: &str, + priority: &str, + tags: &[&str], + ); +} diff --git a/rust_snuba/src/utils/metrics/backends/datadog.rs b/rust_snuba/src/utils/metrics/backends/datadog.rs new file mode 100644 index 0000000000..70cdb6e389 --- /dev/null +++ b/rust_snuba/src/utils/metrics/backends/datadog.rs @@ -0,0 +1,130 @@ +use rust_arroyo::utils::metrics::{MetricsClientTrait}; +use dogstatsd; + +use super::abstract_backend::MetricsBackend; +pub struct DatadogMetricsBackend { + client_sd: dogstatsd::Client, + _tags: Vec, +} + +impl MetricsBackend for DatadogMetricsBackend { + fn events( + &self, + title: &str, + text: &str, + _alert_type: &str, + _priority: &str, + tags: &[&str], + ) { + // TODO figure out how to send priority and alert_type + self.client_sd.event(title, text, tags ).unwrap() + } +} + + +impl DatadogMetricsBackend { + pub fn new(host: String, port: u16 , tags:Vec) -> Self { + + let host_port: String = format!("{host}:{port}"); + let options = dogstatsd::OptionsBuilder::new() + .to_addr(host_port).build(); + + let client = dogstatsd::Client::new(options).unwrap(); + DatadogMetricsBackend { + client_sd: client, + _tags: tags + } + } +} + +impl MetricsClientTrait for DatadogMetricsBackend { + fn counter( + &self, + key: &str, + value: Option, + tags: Option>, + sample_rate: Option, + ) { + if !self.should_sample(sample_rate) { + return; + } + + let tags_str: Vec = tags.unwrap().iter().map(|(k, v)| format!("{k}:{v}")).collect(); + + match value { + Some(v) => { + self.client_sd.count(key, v, &tags_str).unwrap(); + } + None => { + self.client_sd.incr(key, tags_str).unwrap(); + } + } + + + } + + fn gauge( + &self, + key: &str, + value: u64, + tags: Option>, + sample_rate: Option, + ) { + if !self.should_sample(sample_rate) { + return; + } + let tags_str: Vec = tags.unwrap().iter().map(|(k, v)| format!("{k}:{v}")).collect(); + self.client_sd.gauge(key, value.to_string(), tags_str).unwrap(); + } + + fn time( + &self, + key: &str, + value: u64, + tags: Option>, + sample_rate: Option, + ) { + if !self.should_sample(sample_rate) { + return; + } + let tags_str: Vec = tags.unwrap().iter().map(|(k, v)| format!("{k}:{v}")).collect(); + self.client_sd.timing(key, value.try_into().unwrap(), tags_str).unwrap(); + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use rust_arroyo::utils::metrics::{configure_metrics, MetricsClientTrait, self}; + + use crate::utils::metrics::backends::{datadog::DatadogMetricsBackend}; + + + #[test] + fn test_testing_backend() { + // default client sends metrics to statsd daemon on localhost:8125 + let client = dogstatsd::Client::new(dogstatsd::Options::default()).unwrap(); + let client_tags: Vec = Vec::new(); + let testing_backend = DatadogMetricsBackend { + client_sd: client, + _tags: client_tags + }; + + let mut tags: HashMap<&str, &str> = HashMap::new(); + tags.insert("tag1", "value1"); + tags.insert("tag2", "value2"); + + testing_backend.counter("test_counter", Some(1), Some(tags.clone()), None); + testing_backend.gauge("test_gauge", 1, Some(tags.clone()), None); + testing_backend.time("test_time", 1, Some(tags.clone()), None); + + // check configure_metrics writes to METRICS + configure_metrics(testing_backend); + metrics::time("c", 30, Some(HashMap::from([("tag3", "value3")])), None); + + // test constructor + DatadogMetricsBackend::new("0.0.0.0".to_owned(), 8125, Vec::new()) + .counter("test_counter2", Some(2), Some(tags.clone()), None); + } +} diff --git a/rust_snuba/src/utils/metrics/backends/mod.rs b/rust_snuba/src/utils/metrics/backends/mod.rs new file mode 100644 index 0000000000..4936a59d05 --- /dev/null +++ b/rust_snuba/src/utils/metrics/backends/mod.rs @@ -0,0 +1,3 @@ +pub mod datadog; +pub mod abstract_backend; +pub mod testing; diff --git a/rust_snuba/src/utils/metrics/backends/testing.rs b/rust_snuba/src/utils/metrics/backends/testing.rs new file mode 100644 index 0000000000..3a61d38559 --- /dev/null +++ b/rust_snuba/src/utils/metrics/backends/testing.rs @@ -0,0 +1,127 @@ +use std::{collections::HashMap, sync::{Mutex}}; + +use rust_arroyo::utils::metrics::MetricsClientTrait; + +use super::abstract_backend::MetricsBackend; +use lazy_static::lazy_static; + +lazy_static! { + static ref METRICS: Mutex>> = { + let m = HashMap::new(); + Mutex::new(m) + }; +} + + +pub struct MetricCall { + _value: String, + _tags: Vec, +} + +pub struct TestingMetricsBackend { +} + +impl MetricsBackend for TestingMetricsBackend { + fn events( + &self, + _title: &str, + _text: &str, + _alert_type: &str, + _priority: &str, + _tags: &[&str], + ) { + todo!() + } +} + +impl MetricsClientTrait for TestingMetricsBackend{ + fn counter(&self, name: &str, value: Option, tags: Option>, sample_rate: Option) { + if !self.should_sample(sample_rate) { + return; + } + + let mut tags_vec = Vec::new(); + if let Some(tags) = tags { + for (k, v) in tags { + tags_vec.push(format!("{k}:{v}")); + } + } + let mut metrics_map = METRICS.lock().unwrap(); + let metric = metrics_map.entry(name.to_string()).or_insert(Vec::new()); + metric.push(MetricCall { + _value: value.unwrap().to_string(), + _tags: tags_vec, + }); + } + + fn gauge(&self, name: &str, value: u64, tags: Option>, sample_rate: Option) { + if !self.should_sample(sample_rate) { + return; + } + let mut tags_vec = Vec::new(); + if let Some(tags) = tags { + for (k, v) in tags { + tags_vec.push(format!("{k}:{v}")); + } + } + let mut metrics_map = METRICS.lock().unwrap(); + let metric = metrics_map.entry(name.to_string()).or_insert(Vec::new()); + + metric.push(MetricCall { + _value: value.to_string(), + _tags: tags_vec, + }); + } + + fn time(&self, name: &str, value: u64, tags: Option>, sample_rate: Option) { + if !self.should_sample(sample_rate) { + return; + } + + let mut tags_vec = Vec::new(); + if let Some(tags) = tags { + for (k, v) in tags { + tags_vec.push(format!("{k}:{v}")); + } + } + let mut metrics_map = METRICS.lock().unwrap(); + let metric = metrics_map.entry(name.to_string()).or_insert(Vec::new()); + + metric.push(MetricCall { + _value: value.to_string(), + _tags: tags_vec, + }); + } + +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use rust_arroyo::utils::metrics::{configure_metrics, MetricsClientTrait, self}; + + use crate::utils::metrics::backends::testing::METRICS; + + + #[test] + fn test_testing_backend() { + let testing_backend = super::TestingMetricsBackend {}; + + let mut tags: HashMap<&str, &str> = HashMap::new(); + tags.insert("tag1", "value1"); + tags.insert("tag2", "value2"); + + testing_backend.counter("test_counter", Some(1), Some(tags.clone()), None); + testing_backend.gauge("test_gauge", 1, Some(tags.clone()), None); + testing_backend.time("test_time", 1, Some(tags.clone()), None); + assert!(METRICS.lock().unwrap().contains_key("test_counter")); + assert!(METRICS.lock().unwrap().contains_key("test_gauge")); + assert!(METRICS.lock().unwrap().contains_key("test_time")); + + // check configure_metrics writes to METRICS + configure_metrics(testing_backend); + metrics::time("c", 30, Some(HashMap::from([("tag3", "value3")])), None); + assert!(METRICS.lock().unwrap().contains_key("c")); + } +} diff --git a/rust_snuba/src/utils/metrics/mod.rs b/rust_snuba/src/utils/metrics/mod.rs new file mode 100644 index 0000000000..257edb5a63 --- /dev/null +++ b/rust_snuba/src/utils/metrics/mod.rs @@ -0,0 +1 @@ +pub mod backends; diff --git a/rust_snuba/src/utils/mod.rs b/rust_snuba/src/utils/mod.rs new file mode 100644 index 0000000000..e144883287 --- /dev/null +++ b/rust_snuba/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod metrics;