From bf29fb550dfc15b0ed409bc6dd1240980b9fb9d5 Mon Sep 17 00:00:00 2001 From: Nikhar Saxena <84807402+nikhars@users.noreply.github.com> Date: Thu, 11 Apr 2024 11:55:07 -0700 Subject: [PATCH] perf(metrics): Use kafka header optimization (#5756) * Use kafka headers * Minor fixes * Remove unnecessary changes * Refactor to Metric type enum * Add tests --- rust_snuba/src/processors/generic_metrics.rs | 115 ++++++++++++++++++- 1 file changed, 109 insertions(+), 6 deletions(-) diff --git a/rust_snuba/src/processors/generic_metrics.rs b/rust_snuba/src/processors/generic_metrics.rs index df61f8d324..522fbb9bbd 100644 --- a/rust_snuba/src/processors/generic_metrics.rs +++ b/rust_snuba/src/processors/generic_metrics.rs @@ -13,7 +13,7 @@ use crate::{ KafkaMessageMetadata, ProcessorConfig, }; -use rust_arroyo::backends::kafka::types::KafkaPayload; +use rust_arroyo::backends::kafka::types::{Headers, KafkaPayload}; use rust_arroyo::{counter, timer}; use super::utils::enforce_retention; @@ -304,12 +304,54 @@ where } } +// MetricTypeHeader specifies what type of metric was sent on the message +// as per the kafka headers. It is possible to get data with heades missing +// altogether or the specific header key with which to determine the metric +// type to be missing. Hence there is an Unknown variant +#[derive(Debug, Default, PartialEq, Clone)] +enum MetricTypeHeader { + #[default] + Unknown, + Counter, + Set, + Distribution, + Gauge, +} + +impl MetricTypeHeader { + fn from_kafka_header(header: Option<&Headers>) -> Self { + if let Some(headers) = header { + if let Some(header_value) = headers.get("metric_type") { + match header_value { + b"c" => MetricTypeHeader::Counter, + b"s" => MetricTypeHeader::Set, + b"d" => MetricTypeHeader::Distribution, + b"g" => MetricTypeHeader::Gauge, + _ => MetricTypeHeader::Unknown, + } + } else { + // metric_type header not found + MetricTypeHeader::Unknown + } + } else { + // No headers on message + MetricTypeHeader::Unknown + } + } +} + pub fn process_counter_message( payload: KafkaPayload, _metadata: KafkaMessageMetadata, config: &ProcessorConfig, ) -> anyhow::Result { - process_message::(payload, config) + let metric_type_header = MetricTypeHeader::from_kafka_header(payload.headers()); + match metric_type_header { + MetricTypeHeader::Counter | MetricTypeHeader::Unknown => { + process_message::(payload, config) + } + _ => Ok(InsertBatch::skip()), + } } /// The raw row that is written to clickhouse for sets. @@ -380,7 +422,13 @@ pub fn process_set_message( _metadata: KafkaMessageMetadata, config: &ProcessorConfig, ) -> anyhow::Result { - process_message::(payload, config) + let metric_type_header = MetricTypeHeader::from_kafka_header(payload.headers()); + match metric_type_header { + MetricTypeHeader::Set | MetricTypeHeader::Unknown => { + process_message::(payload, config) + } + _ => Ok(InsertBatch::skip()), + } } #[derive(Debug, Serialize, Default)] @@ -457,7 +505,13 @@ pub fn process_distribution_message( _metadata: KafkaMessageMetadata, config: &ProcessorConfig, ) -> anyhow::Result { - process_message::(payload, config) + let metric_type_header = MetricTypeHeader::from_kafka_header(payload.headers()); + match metric_type_header { + MetricTypeHeader::Distribution | MetricTypeHeader::Unknown => { + process_message::(payload, config) + } + _ => Ok(InsertBatch::skip()), + } } #[derive(Debug, Serialize, Default)] @@ -553,7 +607,13 @@ pub fn process_gauge_message( _metadata: KafkaMessageMetadata, config: &ProcessorConfig, ) -> anyhow::Result { - process_message::(payload, config) + let metric_type_header = MetricTypeHeader::from_kafka_header(payload.headers()); + match metric_type_header { + MetricTypeHeader::Gauge | MetricTypeHeader::Unknown => { + process_message::(payload, config) + } + _ => Ok(InsertBatch::skip()), + } } #[cfg(test)] @@ -844,7 +904,7 @@ mod tests { -> std::result::Result), DUMMY_SET_MESSAGE, ); - assert_eq!(result.unwrap(), InsertBatch::default()); + assert_eq!(result.unwrap(), InsertBatch::skip()); } #[test] @@ -1288,4 +1348,47 @@ mod tests { } ); } + + #[test] + fn test_metric_type_header() { + assert_eq!( + MetricTypeHeader::from_kafka_header(None), + MetricTypeHeader::Unknown + ); + assert_eq!( + MetricTypeHeader::from_kafka_header(Some(&Headers::new())), + MetricTypeHeader::Unknown + ); + assert_eq!( + MetricTypeHeader::from_kafka_header(Some( + &Headers::new().insert("key", Some(b"value".to_vec())) + )), + MetricTypeHeader::Unknown + ); + + assert_eq!( + MetricTypeHeader::from_kafka_header(Some( + &Headers::new().insert("metric_type", Some(b"c".to_vec())) + )), + MetricTypeHeader::Counter + ); + assert_eq!( + MetricTypeHeader::from_kafka_header(Some( + &Headers::new().insert("metric_type", Some(b"s".to_vec())) + )), + MetricTypeHeader::Set + ); + assert_eq!( + MetricTypeHeader::from_kafka_header(Some( + &Headers::new().insert("metric_type", Some(b"d".to_vec())) + )), + MetricTypeHeader::Distribution + ); + assert_eq!( + MetricTypeHeader::from_kafka_header(Some( + &Headers::new().insert("metric_type", Some(b"g".to_vec())) + )), + MetricTypeHeader::Gauge + ); + } }