Skip to content

Commit

Permalink
feat(rust): Add processor metrics for Rust consumer (#4737)
Browse files Browse the repository at this point in the history
Adds metrics for `arroyo.consumer.poll.time`, `arroyo.consumer.paused.time` and `arroyo.consumer.processing.time`.
  • Loading branch information
lynnagara authored Sep 19, 2023
1 parent 66d1288 commit be7faea
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 20 deletions.
58 changes: 58 additions & 0 deletions rust_snuba/rust_arroyo/src/processing/metrics_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::utils::metrics::{get_metrics, Metrics};
use core::fmt::Debug;
use std::collections::BTreeMap;
use std::mem;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

#[derive(Debug)]
pub struct MetricsBuffer {
metrics: Arc<Mutex<Box<dyn Metrics>>>,
timers: BTreeMap<String, Duration>,
last_flush: Instant,
}

impl MetricsBuffer {
// A pretty shitty metrics buffer that only handles timing metrics
// and flushes them every second. Needs to be flush()-ed on shutdown
// Doesn't support tags
// Basically the same as https://github.com/getsentry/arroyo/blob/83f5f54e59892ad0b62946ef35d2daec3b561b10/arroyo/processing/processor.py#L80-L112
// We may want to replace this with the statsdproxy aggregation step.
pub fn new() -> Self {
Self {
metrics: get_metrics(),
timers: BTreeMap::new(),
last_flush: Instant::now(),
}
}

pub fn incr_timing(&mut self, metric: &str, duration: Duration) {
if let Some(value) = self.timers.get_mut(metric) {
*value += duration;
} else {
self.timers.insert(metric.to_string(), duration);
}
self.throttled_record();
}

pub fn flush(&mut self) {
let timers = mem::take(&mut self.timers);
for (metric, duration) in timers {
self.metrics
.timing(&metric, duration.as_millis() as u64, None);
}
self.last_flush = Instant::now();
}

fn throttled_record(&mut self) {
if self.last_flush.elapsed() > Duration::from_secs(1) {
self.flush();
}
}
}

impl Default for MetricsBuffer {
fn default() -> Self {
Self::new()
}
}
30 changes: 22 additions & 8 deletions rust_snuba/rust_arroyo/src/processing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod metrics_buffer;
pub mod strategies;

use crate::backends::{AssignmentCallbacks, Consumer};
Expand Down Expand Up @@ -57,7 +58,7 @@ impl<TPayload: 'static + Clone> AssignmentCallbacks for Callbacks<TPayload> {
stg.strategy = Some(stg.processing_factory.create());
}
fn on_revoke(&mut self, _: Vec<Partition>) {
let metrics = get_metrics();
let mut metrics = get_metrics();
let start = Instant::now();

let mut stg = self.strategies.lock().unwrap();
Expand All @@ -77,6 +78,8 @@ impl<TPayload: 'static + Clone> AssignmentCallbacks for Callbacks<TPayload> {
start.elapsed().as_millis() as u64,
None,
);

// TODO: Figure out how to flush the metrics buffer from the recovation callback.
}
}

Expand All @@ -96,6 +99,7 @@ pub struct StreamProcessor<'a, TPayload: Clone> {
message: Option<Message<TPayload>>,
processor_handle: ProcessorHandle,
paused_timestamp: Option<Instant>,
metrics_buffer: metrics_buffer::MetricsBuffer,
}

impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> {
Expand All @@ -116,6 +120,7 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> {
shutdown_requested: Arc::new(AtomicBool::new(false)),
},
paused_timestamp: None,
metrics_buffer: metrics_buffer::MetricsBuffer::new(),
}
}

Expand All @@ -132,23 +137,23 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> {
// If a message was carried over from the previous run, the consumer
// should be paused and not returning any messages on ``poll``.
let res = self.consumer.poll(Some(Duration::ZERO)).unwrap();

match res {
None => {}
Some(_) => return Err(RunError::InvalidState),
}
} else {
// Otherwise, we need to try fetch a new message from the consumer,
// even if there is no active assignment and/or processing strategy.
let msg = self.consumer.poll(Some(Duration::from_secs(1)));
let poll_start = Instant::now();
//TODO: Support errors properly
match msg {
Ok(None) => {
self.message = None;
}
Ok(Some(inner)) => {
self.message = Some(Message {
match self.consumer.poll(Some(Duration::from_secs(1))) {
Ok(msg) => {
self.message = msg.map(|inner| Message {
inner_message: InnerMessage::BrokerMessage(inner),
});
self.metrics_buffer
.incr_timing("arroyo.consumer.poll.time", poll_start.elapsed());
}
Err(e) => {
log::error!("poll error: {}", e);
Expand All @@ -175,7 +180,12 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> {

let msg = self.message.take();
if let Some(msg_s) = msg {
let processing_start = Instant::now();
let ret = strategy.submit(msg_s);
self.metrics_buffer.incr_timing(
"arroyo.consumer.processing.time",
processing_start.elapsed(),
);
match ret {
Ok(()) => {}
Err(_) => {
Expand All @@ -197,6 +207,10 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> {
let res = self.consumer.resume(partitions);
match res {
Ok(()) => {
self.metrics_buffer.incr_timing(
"arroyo.consumer.paused.time",
self.paused_timestamp.unwrap().elapsed(),
);
self.paused_timestamp = None;
}
Err(_) => return Err(RunError::PauseError),
Expand Down
12 changes: 6 additions & 6 deletions rust_snuba/rust_arroyo/src/utils/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ pub static METRICS: OnceLock<Arc<Mutex<Box<dyn Metrics>>>> = OnceLock::new();
pub trait Metrics: Debug + Send + Sync {
fn increment(&self, key: &str, value: i64, tags: Option<HashMap<&str, &str>>);

fn gauge(&self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>);
fn gauge(&mut self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>);

fn timing(&self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>);
fn timing(&mut self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>);
}

impl Metrics for Arc<Mutex<Box<dyn Metrics>>> {
fn increment(&self, key: &str, value: i64, tags: Option<HashMap<&str, &str>>) {
self.as_ref().lock().unwrap().increment(key, value, tags)
}
fn gauge(&self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
fn gauge(&mut self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
self.as_ref().lock().unwrap().gauge(key, value, tags)
}
fn timing(&self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
fn timing(&mut self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
self.as_ref().lock().unwrap().timing(key, value, tags)
}
}
Expand All @@ -30,9 +30,9 @@ struct Noop {}
impl Metrics for Noop {
fn increment(&self, _key: &str, _value: i64, _tags: Option<HashMap<&str, &str>>) {}

fn gauge(&self, _key: &str, _value: u64, _tags: Option<HashMap<&str, &str>>) {}
fn gauge(&mut self, _key: &str, _value: u64, _tags: Option<HashMap<&str, &str>>) {}

fn timing(&self, _key: &str, _value: u64, _tags: Option<HashMap<&str, &str>>) {}
fn timing(&mut self, _key: &str, _value: u64, _tags: Option<HashMap<&str, &str>>) {}
}

pub fn configure_metrics(metrics: Box<dyn Metrics>) {
Expand Down
7 changes: 4 additions & 3 deletions rust_snuba/src/metrics/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ impl ArroyoMetrics for StatsDBackend {
}
}

fn gauge(&self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
fn gauge(&mut self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
if let Err(e) = self.send_with_tags(self.client.gauge_with_tags(key, value), tags) {
log::debug!("Error sending metric: {}", e);
}
}

fn timing(&self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
fn timing(&mut self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
if let Err(e) = self.send_with_tags(self.client.time_with_tags(key, value), tags) {
log::debug!("Error sending metric: {}", e);
}
Expand All @@ -71,7 +71,8 @@ mod tests {

#[test]
fn statsd_metric_backend() {
let backend = StatsDBackend::new("0.0.0.0", 8125, "test", HashMap::from([("env", "prod")]));
let mut backend =
StatsDBackend::new("0.0.0.0", 8125, "test", HashMap::from([("env", "prod")]));

backend.increment("a", 1, Some(HashMap::from([("tag1", "value1")])));
backend.gauge("b", 20, Some(HashMap::from([("tag2", "value2")])));
Expand Down
6 changes: 3 additions & 3 deletions rust_snuba/src/metrics/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl ArroyoMetrics for TestingMetricsBackend {
self.metric_calls.lock().unwrap().push(res);
}

fn gauge(&self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
fn gauge(&mut self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
let builder = self.client.gauge_with_tags(key, value);

let res = self
Expand All @@ -68,7 +68,7 @@ impl ArroyoMetrics for TestingMetricsBackend {
self.metric_calls.lock().unwrap().push(res);
}

fn timing(&self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
fn timing(&mut self, key: &str, value: u64, tags: Option<HashMap<&str, &str>>) {
let builder = self.client.time_with_tags(key, value);

let res = self
Expand All @@ -88,7 +88,7 @@ mod tests {

#[test]
fn testing_metrics_backend() {
let backend = TestingMetricsBackend::new("snuba.rust-consumer");
let mut backend = TestingMetricsBackend::new("snuba.rust-consumer");

backend.increment("a", 1, Some(HashMap::from([("tag1", "value1")])));
backend.gauge("b", 20, Some(HashMap::from([("tag2", "value2")])));
Expand Down

0 comments on commit be7faea

Please sign in to comment.