Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): add some metrics backends #4063

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,8 @@ glob = "0.3.1"
pyo3 = { version = "0.18.1", features = ["chrono", "extension-module"] }
ctrlc = "3.2.5"
sentry = "0.31.0"

# unofficial lib. haven't validated it yet
dogstatsd = "0.8.0"

lazy_static = "1.4.0"
26 changes: 16 additions & 10 deletions rust_snuba/rust_arroyo/src/utils/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ use std::collections::HashMap;
use std::net::{ToSocketAddrs, UdpSocket};
use std::sync::Arc;

pub trait Metrics {
pub trait MetricsClientTrait: Send + Sync {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not idiomatic to name traits as Trait


fn should_sample(&self, sample_rate: Option<f64>) -> bool {
rand::thread_rng().gen_range(0.0..=1.0) < sample_rate.unwrap_or(1.0)
}

fn counter(
&self,
key: &str,
Expand Down Expand Up @@ -39,7 +44,7 @@ pub struct MetricsClient {
prefix: String,
}

impl Metrics for MetricsClient {
impl MetricsClientTrait for MetricsClient {
fn counter(
&self,
key: &str,
Expand All @@ -61,7 +66,7 @@ impl Metrics for MetricsClient {
match result {
Ok(_) => {}
Err(_err) => {
println!("Failed to send metric {}: {}", key, _err)
println!("Failed to send metric {key}: {_err}")
}
}
}
Expand All @@ -86,7 +91,7 @@ impl Metrics for MetricsClient {
match result {
Ok(_) => {}
Err(_err) => {
println!("Failed to send metric {}: {}", key, _err)
println!("Failed to send metric {key}: {_err}")
}
}
}
Expand All @@ -111,17 +116,14 @@ impl Metrics for MetricsClient {
match result {
Ok(_) => {}
Err(_err) => {
println!("Failed to send metric {}: {}", key, _err)
println!("Failed to send metric {key}: {_err}")
}
}
}
}
}

impl MetricsClient {
fn should_sample(&self, sample_rate: Option<f64>) -> bool {
rand::thread_rng().gen_range(0.0..=1.0) < sample_rate.unwrap_or(1.0)
}

fn send_with_tags<'t, T: cadence::Metric + From<String>>(
&self,
Expand All @@ -135,14 +137,14 @@ impl MetricsClient {
match result {
Ok(_) => {}
Err(_err) => {
println!("Failed to send metric with tags: {}", _err)
println!("Failed to send metric with tags: {_err}")
}
}
}
}

lazy_static! {
static ref METRICS_CLIENT: RwLock<Option<Arc<MetricsClient>>> = RwLock::new(None);
static ref METRICS_CLIENT: RwLock<Option<Arc<dyn MetricsClientTrait>>> = RwLock::new(None);
}

const METRICS_MAX_QUEUE_SIZE: usize = 1024;
Expand All @@ -164,6 +166,9 @@ pub fn init<A: ToSocketAddrs>(prefix: &str, host: A) {
println!("Emitting metrics with prefix {}", metrics_client.prefix);
*METRICS_CLIENT.write() = Some(Arc::new(metrics_client));
}
pub fn configure_metrics(metrics_client: impl MetricsClientTrait + 'static){
*METRICS_CLIENT.write() = Some(Arc::new(metrics_client));
}

// TODO: Remove cloning METRICS_CLIENT each time this is called using thread local storage.
pub fn increment(
Expand Down Expand Up @@ -211,6 +216,7 @@ mod tests {
.clone()
.unwrap()
.should_sample(Some(0.0)),);

assert!(METRICS_CLIENT
.read()
.clone()
Expand Down
1 change: 1 addition & 0 deletions rust_snuba/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod config;
mod consumer;
mod strategies;
mod types;
pub mod utils;

use pyo3::prelude::*;

Expand Down
12 changes: 12 additions & 0 deletions rust_snuba/src/utils/metrics/backends/abstract_backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use rust_arroyo::utils::metrics::MetricsClientTrait;

pub trait MetricsBackend: MetricsClientTrait {
fn events(
&self,
title: &str,
text: &str,
alert_type: &str,
priority: &str,
tags: &[&str],
);
}
130 changes: 130 additions & 0 deletions rust_snuba/src/utils/metrics/backends/datadog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use rust_arroyo::utils::metrics::{MetricsClientTrait};
use dogstatsd;

use super::abstract_backend::MetricsBackend;
pub struct DatadogMetricsBackend {
client_sd: dogstatsd::Client,
_tags: Vec<String>,
}

impl MetricsBackend for DatadogMetricsBackend {
fn events(
&self,
title: &str,
text: &str,
_alert_type: &str,
_priority: &str,
tags: &[&str],
) {
// TODO figure out how to send priority and alert_type
self.client_sd.event(title, text, tags ).unwrap()
}
}


impl DatadogMetricsBackend {
pub fn new(host: String, port: u16 , tags:Vec<String>) -> Self {

let host_port: String = format!("{host}:{port}");
let options = dogstatsd::OptionsBuilder::new()
.to_addr(host_port).build();

let client = dogstatsd::Client::new(options).unwrap();
DatadogMetricsBackend {
client_sd: client,
_tags: tags
}
}
}

impl MetricsClientTrait for DatadogMetricsBackend {
fn counter(
&self,
key: &str,
value: Option<i64>,
tags: Option<std::collections::HashMap<&str, &str>>,
sample_rate: Option<f64>,
) {
if !self.should_sample(sample_rate) {
return;
}

let tags_str: Vec<String> = tags.unwrap().iter().map(|(k, v)| format!("{k}:{v}")).collect();

match value {
Some(v) => {
self.client_sd.count(key, v, &tags_str).unwrap();
}
None => {
self.client_sd.incr(key, tags_str).unwrap();
}
}


}

fn gauge(
&self,
key: &str,
value: u64,
tags: Option<std::collections::HashMap<&str, &str>>,
sample_rate: Option<f64>,
) {
if !self.should_sample(sample_rate) {
return;
}
let tags_str: Vec<String> = tags.unwrap().iter().map(|(k, v)| format!("{k}:{v}")).collect();
self.client_sd.gauge(key, value.to_string(), tags_str).unwrap();
}

fn time(
&self,
key: &str,
value: u64,
tags: Option<std::collections::HashMap<&str, &str>>,
sample_rate: Option<f64>,
) {
if !self.should_sample(sample_rate) {
return;
}
let tags_str: Vec<String> = tags.unwrap().iter().map(|(k, v)| format!("{k}:{v}")).collect();
self.client_sd.timing(key, value.try_into().unwrap(), tags_str).unwrap();
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use rust_arroyo::utils::metrics::{configure_metrics, MetricsClientTrait, self};

use crate::utils::metrics::backends::{datadog::DatadogMetricsBackend};


#[test]
fn test_testing_backend() {
// default client sends metrics to statsd daemon on localhost:8125
let client = dogstatsd::Client::new(dogstatsd::Options::default()).unwrap();
let client_tags: Vec<String> = Vec::new();
let testing_backend = DatadogMetricsBackend {
client_sd: client,
_tags: client_tags
};

let mut tags: HashMap<&str, &str> = HashMap::new();
tags.insert("tag1", "value1");
tags.insert("tag2", "value2");

testing_backend.counter("test_counter", Some(1), Some(tags.clone()), None);
testing_backend.gauge("test_gauge", 1, Some(tags.clone()), None);
testing_backend.time("test_time", 1, Some(tags.clone()), None);

// check configure_metrics writes to METRICS
configure_metrics(testing_backend);
metrics::time("c", 30, Some(HashMap::from([("tag3", "value3")])), None);

// test constructor
DatadogMetricsBackend::new("0.0.0.0".to_owned(), 8125, Vec::new())
.counter("test_counter2", Some(2), Some(tags.clone()), None);
}
}
3 changes: 3 additions & 0 deletions rust_snuba/src/utils/metrics/backends/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod datadog;
pub mod abstract_backend;
pub mod testing;
Loading
Loading