Skip to content

Commit

Permalink
perf(metrics): Use kafka header optimization (#5756)
Browse files Browse the repository at this point in the history
* Use kafka headers

* Minor fixes

* Remove unnecessary changes

* Refactor to Metric type enum

* Add tests
  • Loading branch information
nikhars committed Apr 11, 2024
1 parent 26fdbb7 commit bf29fb5
Showing 1 changed file with 109 additions and 6 deletions.
115 changes: 109 additions & 6 deletions rust_snuba/src/processors/generic_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
KafkaMessageMetadata, ProcessorConfig,
};

use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::types::{Headers, KafkaPayload};
use rust_arroyo::{counter, timer};

use super::utils::enforce_retention;
Expand Down Expand Up @@ -304,12 +304,54 @@ where
}
}

// MetricTypeHeader specifies what type of metric was sent on the message
// as per the kafka headers. It is possible to get data with heades missing
// altogether or the specific header key with which to determine the metric
// type to be missing. Hence there is an Unknown variant
#[derive(Debug, Default, PartialEq, Clone)]
enum MetricTypeHeader {
#[default]
Unknown,
Counter,
Set,
Distribution,
Gauge,
}

impl MetricTypeHeader {
fn from_kafka_header(header: Option<&Headers>) -> Self {
if let Some(headers) = header {
if let Some(header_value) = headers.get("metric_type") {
match header_value {
b"c" => MetricTypeHeader::Counter,
b"s" => MetricTypeHeader::Set,
b"d" => MetricTypeHeader::Distribution,
b"g" => MetricTypeHeader::Gauge,
_ => MetricTypeHeader::Unknown,
}
} else {
// metric_type header not found
MetricTypeHeader::Unknown
}
} else {
// No headers on message
MetricTypeHeader::Unknown
}
}
}

pub fn process_counter_message(
payload: KafkaPayload,
_metadata: KafkaMessageMetadata,
config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
process_message::<CountersRawRow>(payload, config)
let metric_type_header = MetricTypeHeader::from_kafka_header(payload.headers());
match metric_type_header {
MetricTypeHeader::Counter | MetricTypeHeader::Unknown => {
process_message::<CountersRawRow>(payload, config)
}
_ => Ok(InsertBatch::skip()),
}
}

/// The raw row that is written to clickhouse for sets.
Expand Down Expand Up @@ -380,7 +422,13 @@ pub fn process_set_message(
_metadata: KafkaMessageMetadata,
config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
process_message::<SetsRawRow>(payload, config)
let metric_type_header = MetricTypeHeader::from_kafka_header(payload.headers());
match metric_type_header {
MetricTypeHeader::Set | MetricTypeHeader::Unknown => {
process_message::<SetsRawRow>(payload, config)
}
_ => Ok(InsertBatch::skip()),
}
}

#[derive(Debug, Serialize, Default)]
Expand Down Expand Up @@ -457,7 +505,13 @@ pub fn process_distribution_message(
_metadata: KafkaMessageMetadata,
config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
process_message::<DistributionsRawRow>(payload, config)
let metric_type_header = MetricTypeHeader::from_kafka_header(payload.headers());
match metric_type_header {
MetricTypeHeader::Distribution | MetricTypeHeader::Unknown => {
process_message::<DistributionsRawRow>(payload, config)
}
_ => Ok(InsertBatch::skip()),
}
}

#[derive(Debug, Serialize, Default)]
Expand Down Expand Up @@ -553,7 +607,13 @@ pub fn process_gauge_message(
_metadata: KafkaMessageMetadata,
config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
process_message::<GaugesRawRow>(payload, config)
let metric_type_header = MetricTypeHeader::from_kafka_header(payload.headers());
match metric_type_header {
MetricTypeHeader::Gauge | MetricTypeHeader::Unknown => {
process_message::<GaugesRawRow>(payload, config)
}
_ => Ok(InsertBatch::skip()),
}
}

#[cfg(test)]
Expand Down Expand Up @@ -844,7 +904,7 @@ mod tests {
-> std::result::Result<crate::types::InsertBatch, anyhow::Error>),
DUMMY_SET_MESSAGE,
);
assert_eq!(result.unwrap(), InsertBatch::default());
assert_eq!(result.unwrap(), InsertBatch::skip());
}

#[test]
Expand Down Expand Up @@ -1288,4 +1348,47 @@ mod tests {
}
);
}

#[test]
fn test_metric_type_header() {
assert_eq!(
MetricTypeHeader::from_kafka_header(None),
MetricTypeHeader::Unknown
);
assert_eq!(
MetricTypeHeader::from_kafka_header(Some(&Headers::new())),
MetricTypeHeader::Unknown
);
assert_eq!(
MetricTypeHeader::from_kafka_header(Some(
&Headers::new().insert("key", Some(b"value".to_vec()))
)),
MetricTypeHeader::Unknown
);

assert_eq!(
MetricTypeHeader::from_kafka_header(Some(
&Headers::new().insert("metric_type", Some(b"c".to_vec()))
)),
MetricTypeHeader::Counter
);
assert_eq!(
MetricTypeHeader::from_kafka_header(Some(
&Headers::new().insert("metric_type", Some(b"s".to_vec()))
)),
MetricTypeHeader::Set
);
assert_eq!(
MetricTypeHeader::from_kafka_header(Some(
&Headers::new().insert("metric_type", Some(b"d".to_vec()))
)),
MetricTypeHeader::Distribution
);
assert_eq!(
MetricTypeHeader::from_kafka_header(Some(
&Headers::new().insert("metric_type", Some(b"g".to_vec()))
)),
MetricTypeHeader::Gauge
);
}
}

0 comments on commit bf29fb5

Please sign in to comment.