Skip to content

Commit

Permalink
ref(rust): Add a few metrics missing from snuba dashboard (#5102)
Browse files Browse the repository at this point in the history
* implement batch_time metric

* add insertions.batch_write_ms metric

* implement schema validation and invalid message metrics
  • Loading branch information
untitaker authored Nov 27, 2023
1 parent b07852e commit 591219c
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 28 deletions.
57 changes: 31 additions & 26 deletions rust_snuba/rust_arroyo/src/processing/strategies/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::processing::strategies::{
SubmitError,
};
use crate::types::{Message, Partition};
use crate::utils::metrics::{get_metrics, BoxMetrics};
use std::collections::BTreeMap;
use std::mem;
use std::sync::Arc;
Expand Down Expand Up @@ -50,6 +51,7 @@ pub struct Reduce<T, TResult> {
batch_state: BatchState<T, TResult>,
message_carried_over: Option<Message<TResult>>,
commit_request_carried_over: Option<CommitRequest>,
metrics: BoxMetrics,
}

impl<T: Send + Sync, TResult: Clone + Send + Sync> ProcessingStrategy<T> for Reduce<T, TResult> {
Expand Down Expand Up @@ -130,6 +132,7 @@ impl<T, TResult: Clone> Reduce<T, TResult> {
batch_state,
message_carried_over: None,
commit_request_carried_over: None,
metrics: get_metrics(),
}
}

Expand All @@ -153,38 +156,40 @@ impl<T, TResult: Clone> Reduce<T, TResult> {
return Ok(());
}

let batch_time = self.batch_state.batch_start_time.elapsed().ok();
let batch_complete = self.batch_state.message_count >= self.max_batch_size
|| self
.batch_state
.batch_start_time
.elapsed()
.unwrap_or_default()
> self.max_batch_time;

if batch_complete || force {
let batch_state = mem::replace(
&mut self.batch_state,
BatchState::new(self.initial_value.clone(), self.accumulator.clone()),
|| batch_time.unwrap_or_default() > self.max_batch_time;

if !batch_complete && !force {
return Ok(());
}

if let Some(batch_time) = batch_time {
self.metrics.timing(
"arroyo.strategies.reduce.batch_time",
batch_time.as_secs(),
None,
);
}

let next_message =
Message::new_any_message(batch_state.value.unwrap(), batch_state.offsets);
let batch_state = mem::replace(
&mut self.batch_state,
BatchState::new(self.initial_value.clone(), self.accumulator.clone()),
);

match self.next_step.submit(next_message) {
Err(SubmitError::MessageRejected(MessageRejected {
message: transformed_message,
})) => {
self.message_carried_over = Some(transformed_message);
return Ok(());
}
Err(SubmitError::InvalidMessage(invalid_message)) => {
return Err(invalid_message);
}
Ok(_) => return Ok(()),
let next_message =
Message::new_any_message(batch_state.value.unwrap(), batch_state.offsets);

match self.next_step.submit(next_message) {
Err(SubmitError::MessageRejected(MessageRejected {
message: transformed_message,
})) => {
self.message_carried_over = Some(transformed_message);
Ok(())
}
Err(SubmitError::InvalidMessage(invalid_message)) => Err(invalid_message),
Ok(_) => Ok(()),
}

Ok(())
}
}

Expand Down
3 changes: 3 additions & 0 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use rust_arroyo::processing::strategies::run_task_in_threads::{
use rust_arroyo::processing::strategies::InvalidMessage;
use rust_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory};
use rust_arroyo::types::{BrokerMessage, InnerMessage, Message};
use rust_arroyo::utils::metrics::get_metrics;
use std::collections::BTreeMap;
use std::sync::Arc;

Expand Down Expand Up @@ -95,6 +96,8 @@ impl TaskRunner<KafkaPayload, BytesInsertBatch> for MessageProcessor {
// however, as Sentry captures `error` logs as errors by default,
// we would double-log this error here:
tracing::error!(%error, "Failed processing message");
let metrics = get_metrics();
metrics.increment("invalid_message", 1, None);
sentry::with_scope(
|_scope| {
// FIXME(swatinem): we already moved `broker_message.payload`
Expand Down
19 changes: 17 additions & 2 deletions rust_snuba/src/strategies/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};

use reqwest::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION};
use reqwest::{Client, Response};
Expand Down Expand Up @@ -46,14 +46,29 @@ impl TaskRunner<BytesInsertBatch, BytesInsertBatch> for ClickhouseWriter {
}

tracing::debug!("performing write");
let write_start = SystemTime::now();

let response = client
.send(insert_batch.encoded_rows().to_vec())
.await
.unwrap();

tracing::debug!(?response);
tracing::info!("Inserted {} rows", insert_batch.len());

let write_finish = SystemTime::now();

if let Ok(elapsed) = write_finish.duration_since(write_start) {
metrics.timing(
"insertions.batch_write_ms",
elapsed.as_millis() as u64,
None,
);
}
metrics.increment(
"insertions.batch_write_msgs",
insert_batch.len() as i64,
None,
);
insert_batch.record_message_latency(&metrics);

Ok(message)
Expand Down
4 changes: 4 additions & 0 deletions rust_snuba/src/strategies/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use rust_arroyo::processing::strategies::{
CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy, SubmitError,
};
use rust_arroyo::types::{BrokerMessage, InnerMessage, Message, Partition};
use rust_arroyo::utils::metrics::{get_metrics, BoxMetrics};

use std::collections::BTreeMap;
use std::collections::VecDeque;
Expand Down Expand Up @@ -55,6 +56,7 @@ pub struct PythonTransformStep {
message_carried_over: Option<Message<BytesInsertBatch>>,
processing_pool: Option<procspawn::Pool>,
max_queue_depth: usize,
metrics: BoxMetrics,
}

impl PythonTransformStep {
Expand Down Expand Up @@ -97,6 +99,7 @@ impl PythonTransformStep {
message_carried_over: None,
processing_pool,
max_queue_depth: max_queue_depth.unwrap_or(processes),
metrics: get_metrics(),
})
}

Expand Down Expand Up @@ -159,6 +162,7 @@ impl PythonTransformStep {
}
}
Err(error) => {
self.metrics.increment("invalid_message", 1, None);
tracing::error!(error, "Invalid message");
}
}
Expand Down
6 changes: 6 additions & 0 deletions rust_snuba/src/strategies/validate_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use rust_arroyo::processing::strategies::{
CommitRequest, InvalidMessage, ProcessingStrategy, SubmitError,
};
use rust_arroyo::types::{InnerMessage, Message};
use rust_arroyo::utils::metrics::{get_metrics, BoxMetrics};
use sentry_kafka_schemas;

pub struct SchemaValidator {
schema: Option<Arc<sentry_kafka_schemas::Schema>>,
enforce_schema: bool,
metrics: BoxMetrics,
}

impl SchemaValidator {
Expand All @@ -34,6 +36,7 @@ impl SchemaValidator {
SchemaValidator {
schema,
enforce_schema,
metrics: get_metrics(),
}
}
}
Expand All @@ -44,6 +47,7 @@ impl TaskRunner<KafkaPayload, KafkaPayload> for SchemaValidator {
return Box::pin(async move { Ok(message) });
};
let enforce_schema = self.enforce_schema;
let metrics = self.metrics.clone();

Box::pin(async move {
// FIXME: this will panic when the payload is empty
Expand All @@ -54,6 +58,8 @@ impl TaskRunner<KafkaPayload, KafkaPayload> for SchemaValidator {
};

tracing::error!(%error, "Validation error");
metrics.increment("schema_validation.failed", 1, None);

if !enforce_schema {
return Ok(message);
};
Expand Down

0 comments on commit 591219c

Please sign in to comment.