Skip to content

Commit

Permalink
feat(generic-metrics): Add Base64 decoding to Snuba processors (#5761)
Browse files Browse the repository at this point in the history
* wip

* add base 64

* add support for base64 decoding

* add tests

* add test

* fix lint

* try with generic decoder

* fix functions signature

* add error propagation so that we DLQ

* add sets test

* remove comment

* fix linting

* fix test

* rename into_vec to try_into_vec

* add checks to b64 decoded bytes length

* lint anyhow macro

* update error message

* refactor set value parsing

* bump schema version

* bump in cargo

* add snapshots

---------

Co-authored-by: John Yang <john.yang@sentry.io>
  • Loading branch information
ayirr7 and john-z-yang authored Apr 25, 2024
1 parent 27a539c commit 07efd32
Show file tree
Hide file tree
Showing 12 changed files with 372 additions and 12 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.3.4
sentry-arroyo==2.17.1
sentry-kafka-schemas==0.1.71
sentry-kafka-schemas==0.1.72
sentry-redis-tools==0.3.0
sentry-relay==0.8.44
sentry-sdk==1.40.5
Expand Down
11 changes: 9 additions & 2 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pyo3 = { version = "0.18.1", features = ["chrono"] }
reqwest = { version = "0.11.11", features = ["stream"] }
rust_arroyo = { version = "*", git = "https://github.com/getsentry/arroyo" }
sentry = { version = "0.32.0", features = ["anyhow", "tracing"] }
sentry-kafka-schemas = "0.1.71"
sentry-kafka-schemas = "0.1.72"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
thiserror = "1.0"
Expand All @@ -50,11 +50,13 @@ json-schema-diff = "0.1.7"
serde_path_to_error = "0.1.15"
hyper = "1.2.0"
tokio-stream = "0.1.15"
data-encoding = "2.5.0"


[patch.crates-io]
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka" }


[dev-dependencies]
criterion = "0.5.1"
httpmock = "0.7.0"
Expand Down
242 changes: 234 additions & 8 deletions rust_snuba/src/processors/generic_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use adler::Adler32;
use anyhow::{Context, Error};
use anyhow::{anyhow, Context, Error};
use chrono::DateTime;
use serde::{
de::value::{MapAccessDeserializer, SeqAccessDeserializer},
Expand All @@ -15,6 +15,7 @@ use crate::{

use rust_arroyo::backends::kafka::types::{Headers, KafkaPayload};
use rust_arroyo::{counter, timer};
static BASE64: data_encoding::Encoding = data_encoding::BASE64;

use super::utils::enforce_retention;

Expand Down Expand Up @@ -76,7 +77,7 @@ enum MetricValue {
#[serde(rename = "c")]
Counter(f64),
#[serde(rename = "s", deserialize_with = "encoded_series_compat_deserializer")]
Set(EncodedSeries<u64>),
Set(EncodedSeries<u32>),
#[serde(rename = "d", deserialize_with = "encoded_series_compat_deserializer")]
Distribution(EncodedSeries<f64>),
#[serde(rename = "g")]
Expand All @@ -89,16 +90,61 @@ enum MetricValue {
},
}

trait Decodable<const SIZE: usize>: Copy {
const SIZE: usize = SIZE;

fn decode_bytes(bytes: [u8; SIZE]) -> Self;
}

impl Decodable<4> for u32 {
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self {
Self::from_le_bytes(bytes)
}
}

impl Decodable<8> for u64 {
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self {
Self::from_le_bytes(bytes)
}
}

impl Decodable<8> for f64 {
fn decode_bytes(bytes: [u8; Self::SIZE]) -> Self {
Self::from_le_bytes(bytes)
}
}

#[derive(Debug, Deserialize)]
#[serde(tag = "format", rename_all = "lowercase")]
enum EncodedSeries<T> {
Array { data: Vec<T> },
Base64 { data: String },
}

impl<T> EncodedSeries<T> {
fn into_vec(self) -> Vec<T> {
fn try_into_vec<const SIZE: usize>(self) -> Result<Vec<T>, anyhow::Error>
where
T: Decodable<SIZE>,
{
match self {
EncodedSeries::Array { data } => data,
EncodedSeries::Array { data } => Ok(data),
EncodedSeries::Base64 { data, .. } => {
let decoded_bytes = BASE64.decode(data.as_bytes())?;
if decoded_bytes.len() % T::SIZE == 0 {
Ok(decoded_bytes
.chunks_exact(T::SIZE)
.map(TryInto::try_into)
.map(Result::unwrap) // OK to unwrap, `chunks_exact` always yields slices of the right length
.map(T::decode_bytes)
.collect())
} else {
Err(anyhow!(
"Decoded Base64 cannot be chunked into {}, got {}",
T::SIZE,
decoded_bytes.len()
))
}
}
}
}
}
Expand Down Expand Up @@ -363,7 +409,7 @@ pub fn process_counter_message(
struct SetsRawRow {
#[serde(flatten)]
common_fields: CommonMetricFields,
set_values: Vec<u64>,
set_values: Vec<u32>,
}

impl Parse for SetsRawRow {
Expand All @@ -372,7 +418,7 @@ impl Parse for SetsRawRow {
config: &ProcessorConfig,
) -> anyhow::Result<Option<SetsRawRow>> {
let set_values = match from.value {
MetricValue::Set(values) => values.into_vec(),
MetricValue::Set(values) => values.try_into_vec()?,
_ => return Ok(Option::None),
};

Expand Down Expand Up @@ -452,11 +498,13 @@ impl Parse for DistributionsRawRow {
from: FromGenericMetricsMessage,
config: &ProcessorConfig,
) -> anyhow::Result<Option<DistributionsRawRow>> {
let distribution_values = match from.value {
MetricValue::Distribution(value) => value.into_vec(),
let maybe_dist = match from.value {
MetricValue::Distribution(value) => value.try_into_vec(),
_ => return Ok(Option::None),
};

let distribution_values = maybe_dist?;

timer!(
"generic_metrics.messages.dists_value_len",
distribution_values.len() as u64
Expand Down Expand Up @@ -757,6 +805,184 @@ mod tests {
"aggregation_option": "ten_second"
}"#;

const DUMMY_BASE64_ENCODED_DISTRIBUTION_MESSAGE: &str = r#"{
"version": 2,
"use_case_id": "spans",
"org_id": 1,
"project_id": 3,
"metric_id": 65563,
"timestamp": 1704614940,
"sentry_received_timestamp": 1704614940,
"tags": {"9223372036854776010":"production","9223372036854776017":"healthy","65690":"metric_e2e_spans_dist_v_VUW93LMS"},
"retention_days": 90,
"mapping_meta":{"d":{"65560":"d:spans/duration@second"},"h":{"9223372036854776017":"session.status","9223372036854776010":"environment"},"f":{"65691":"metric_e2e_spans_dist_k_VUW93LMS"}},
"type": "d",
"value": {"format": "base64", "data": "AAAAAAAACEAAAAAAAADwPwAAAAAAAABA"}
}"#;

const DUMMY_BASE64_ENCODED_SET_MESSAGE: &str = r#"{
"version": 2,
"use_case_id": "spans",
"org_id": 1,
"project_id": 3,
"metric_id": 65563,
"timestamp": 1704614940,
"sentry_received_timestamp": 1704614940,
"tags": {"9223372036854776010":"production","9223372036854776017":"healthy","65690":"metric_e2e_spans_dist_v_VUW93LMS"},
"retention_days": 90,
"mapping_meta":{"d":{"65560":"s:spans/duration@second"},"h":{"9223372036854776017":"session.status","9223372036854776010":"environment"},"f":{"65691":"metric_e2e_spans_set_k_VUW93LMS"}},
"type": "s",
"value": {"format": "base64", "data": "AQAAAAcAAAA="}
}"#;

#[test]
fn test_base64_decode_f64() {
assert!(
EncodedSeries::<f64>::Base64 {
data: "AAAAAAAACEAAAAAAAADwPwAAAAAAAABA".to_string(),
}
.try_into_vec()
.ok()
.unwrap()
== vec![3f64, 1f64, 2f64]
)
}

#[test]
fn test_distribution_processor_with_v2_distribution_message() {
let result: Result<InsertBatch, Error> = test_processor_with_payload(
&(process_distribution_message
as fn(
rust_arroyo::backends::kafka::types::KafkaPayload,
crate::types::KafkaMessageMetadata,
&crate::ProcessorConfig,
)
-> std::result::Result<crate::types::InsertBatch, anyhow::Error>),
DUMMY_BASE64_ENCODED_DISTRIBUTION_MESSAGE,
);
let expected_row = DistributionsRawRow {
common_fields: CommonMetricFields {
use_case_id: "spans".to_string(),
org_id: 1,
project_id: 3,
metric_id: 65563,
timestamp: 1704614940,
retention_days: 90,
tags_key: vec![65690, 9223372036854776010, 9223372036854776017],
tags_indexed_value: vec![0; 3],
tags_raw_value: vec![
"metric_e2e_spans_dist_v_VUW93LMS".to_string(),
"production".to_string(),
"healthy".to_string(),
],
metric_type: "distribution".to_string(),
materialization_version: 2,
timeseries_id: 1436359714,
granularities: vec![
GRANULARITY_ONE_MINUTE,
GRANULARITY_ONE_HOUR,
GRANULARITY_ONE_DAY,
],
decasecond_retention_days: None,
min_retention_days: Some(90),
hr_retention_days: None,
day_retention_days: None,
record_meta: Some(1),
},
distribution_values: vec![3f64, 1f64, 2f64],
enable_histogram: None,
};
assert_eq!(
result.unwrap(),
InsertBatch {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 675)])
})
}
);
}

#[test]
fn test_base64_decode_32() {
assert!(
EncodedSeries::<u32>::Base64 {
data: "AQAAAAcAAAA=".to_string(),
}
.try_into_vec()
.ok()
.unwrap()
== vec![1u32, 7u32]
)
}

#[test]
fn test_base64_decode_32_invalid() {
assert!(EncodedSeries::<u32>::Base64 {
data: "AQAAAAcAAA=".to_string(),
}
.try_into_vec()
.is_err())
}

#[test]
fn test_set_processor_with_v2_set_message() {
let result = test_processor_with_payload(
&(process_set_message
as fn(
rust_arroyo::backends::kafka::types::KafkaPayload,
crate::types::KafkaMessageMetadata,
&crate::ProcessorConfig,
)
-> std::result::Result<crate::types::InsertBatch, anyhow::Error>),
DUMMY_BASE64_ENCODED_SET_MESSAGE,
);
let expected_row = SetsRawRow {
common_fields: CommonMetricFields {
use_case_id: "spans".to_string(),
org_id: 1,
project_id: 3,
metric_id: 65563,
timestamp: 1704614940,
retention_days: 90,
tags_key: vec![65690, 9223372036854776010, 9223372036854776017],
tags_indexed_value: vec![0; 3],
tags_raw_value: vec![
"metric_e2e_spans_dist_v_VUW93LMS".to_string(),
"production".to_string(),
"healthy".to_string(),
],
metric_type: "set".to_string(),
materialization_version: 2,
timeseries_id: 1436359714,
granularities: vec![
GRANULARITY_ONE_MINUTE,
GRANULARITY_ONE_HOUR,
GRANULARITY_ONE_DAY,
],
decasecond_retention_days: None,
min_retention_days: Some(90),
hr_retention_days: None,
day_retention_days: None,
record_meta: Some(1),
},
set_values: vec![1u32, 7u32],
};
assert_eq!(
result.unwrap(),
InsertBatch {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 654)])
})
}
);
}

#[test]
fn test_shouldnt_killswitch() {
let fake_config = Ok(Some("[custom]".to_string()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
source: src/processors/mod.rs
description: "{\n \"version\": 2,\n \"use_case_id\": \"spans\",\n \"org_id\": 1,\n \"project_id\": 3,\n \"metric_id\": 65563,\n \"timestamp\": 1704614940,\n \"sentry_received_timestamp\": 1704614940,\n \"tags\": {\n \"9223372036854776010\": \"production\",\n \"9223372036854776017\": \"healthy\",\n \"65690\": \"metric_e2e_spans_dist_v_VUW93LMS\"\n },\n \"retention_days\": 90,\n \"mapping_meta\": {\n \"d\": {\n \"65560\": \"d:spans/duration@second\"\n },\n \"h\": {\n \"9223372036854776017\": \"session.status\",\n \"9223372036854776010\": \"environment\"\n },\n \"f\": {\n \"65691\": \"metric_e2e_spans_dist_k_VUW93LMS\"\n }\n },\n \"type\": \"d\",\n \"value\": {\n \"format\": \"base64\",\n \"data\": \"AAAAAAAACEAAAAAAAADwPwAAAAAAAABA\"\n }\n}\n"
expression: snapshot_payload
---
[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
source: src/processors/mod.rs
description: "{\n \"version\": 2,\n \"use_case_id\": \"spans\",\n \"org_id\": 1,\n \"project_id\": 3,\n \"metric_id\": 65562,\n \"timestamp\": 1704614940,\n \"sentry_received_timestamp\": 1704614940,\n \"tags\": {\n \"9223372036854776010\": \"production\",\n \"9223372036854776017\": \"errored\",\n \"65690\": \"metric_e2e_spans_set_v_VUW93LMS\"\n },\n \"retention_days\": 90,\n \"mapping_meta\": {\n \"h\": {\n \"9223372036854776017\": \"session.status\",\n \"9223372036854776010\": \"environment\"\n },\n \"f\": {\n \"65690\": \"metric_e2e_spans_set_k_VUW93LMS\"\n },\n \"d\": {\n \"65562\": \"s:spans/error@none\"\n }\n },\n \"type\": \"s\",\n \"value\": {\n \"format\": \"base64\",\n \"data\": \"AQAAAAcAAAA=\"\n }\n}\n"
expression: snapshot_payload
---
[]
Loading

0 comments on commit 07efd32

Please sign in to comment.