diff --git a/rust_snuba/src/processors/eap_spans.rs b/rust_snuba/src/processors/eap_spans.rs new file mode 100644 index 0000000000..c9f82178a8 --- /dev/null +++ b/rust_snuba/src/processors/eap_spans.rs @@ -0,0 +1,617 @@ +use anyhow::Context; +use chrono::DateTime; +use serde::Serialize; +use std::collections::HashMap; +use uuid::Uuid; + +use rust_arroyo::backends::kafka::types::KafkaPayload; + +use crate::config::ProcessorConfig; +use crate::processors::spans::FromSpanMessage; +use crate::processors::utils::enforce_retention; +use crate::types::{InsertBatch, KafkaMessageMetadata}; + +pub fn process_message( + payload: KafkaPayload, + _metadata: KafkaMessageMetadata, + config: &ProcessorConfig, +) -> anyhow::Result { + let payload_bytes = payload.payload().context("Expected payload")?; + let msg: FromSpanMessage = serde_json::from_slice(payload_bytes)?; + + let origin_timestamp = DateTime::from_timestamp(msg.received as i64, 0); + let mut span: EAPSpan = msg.try_into()?; + + span.retention_days = Some(enforce_retention(span.retention_days, &config.env_config)); + + InsertBatch::from_rows([span], origin_timestamp) +} + +#[derive(Debug, Default, Serialize)] +struct EAPSpan { + // the span object for the new "events analytics platform" + organization_id: u64, + project_id: u64, + trace_id: Uuid, + span_id: u64, + #[serde(default)] + parent_span_id: u64, + segment_id: u64, //aka transaction ID + segment_name: String, //aka transaction name + is_segment: bool, //aka "is transaction" + _sort_timestamp: u32, + start_timestamp: u64, + end_timestamp: u64, + duration_ms: u32, + exclusive_time_ms: f64, + retention_days: Option, + name: String, //aka description + + sampling_factor: f64, + sampling_weight: f64, + sign: u8, //1 for additions, -1 for deletions - for this worker it should be 1 + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_0: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_1: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_2: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_3: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_4: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_5: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_6: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_7: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_8: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_9: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_10: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_11: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_12: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_13: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_14: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_15: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_16: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_17: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_18: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_19: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_20: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_21: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_22: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_23: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_24: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_25: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_26: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_27: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_28: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_29: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_30: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_31: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_32: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_33: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_34: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_35: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_36: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_37: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_38: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_39: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_40: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_41: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_42: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_43: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_44: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_45: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_46: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_47: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_48: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_49: HashMap, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_0: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_1: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_2: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_3: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_4: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_5: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_6: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_7: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_8: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_9: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_10: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_11: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_12: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_13: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_14: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_15: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_16: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_17: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_18: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_19: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_20: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_21: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_22: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_23: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_24: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_25: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_26: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_27: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_28: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_29: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_30: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_31: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_32: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_33: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_34: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_35: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_36: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_37: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_38: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_39: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_40: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_41: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_42: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_43: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_44: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_45: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_46: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_47: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_48: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_49: HashMap, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_0: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_1: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_2: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_3: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_4: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_5: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_6: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_7: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_8: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_9: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_10: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_11: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_12: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_13: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_14: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_15: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_16: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_17: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_18: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_19: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_20: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_21: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_22: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_23: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_24: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_25: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_26: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_27: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_28: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_29: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_30: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_31: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_32: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_33: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_34: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_35: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_36: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_37: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_38: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_39: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_40: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_41: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_42: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_43: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_44: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_45: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_46: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_47: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_48: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_bool_49: HashMap, +} + +fn half_md5(input: &[u8]) -> u64 { + let full_hash = md5::compute(input).0; + u64::from_be_bytes(full_hash[0..8].try_into().unwrap()) +} + +impl From for EAPSpan { + fn from(from: FromSpanMessage) -> EAPSpan { + let sentry_tags = from.sentry_tags.unwrap_or_default(); + let tags = from.tags.unwrap_or_default(); + let measurements = from.measurements.unwrap_or_default(); + + let mut res = Self { + organization_id: from.organization_id, + project_id: from.project_id, + trace_id: from.trace_id, + span_id: u64::from_str_radix(&from.span_id, 16).unwrap_or_default(), + parent_span_id: from + .parent_span_id + .map_or(0, |s| u64::from_str_radix(&s, 16).unwrap_or(0)), + segment_id: from + .segment_id + .map_or(0, |s| u64::from_str_radix(&s, 16).unwrap_or(0)), + segment_name: sentry_tags.get("transaction").cloned().unwrap_or_default(), + is_segment: from.is_segment, + _sort_timestamp: (from.start_timestamp_ms / 1000) as u32, + start_timestamp: from.start_timestamp_ms, + end_timestamp: from.start_timestamp_ms + from.duration_ms as u64, + duration_ms: from.duration_ms, + exclusive_time_ms: from.exclusive_time_ms, + retention_days: from.retention_days, + name: from.description.unwrap_or_default(), + + sampling_weight: 1., + sampling_factor: 1., + sign: 1, + + ..Default::default() + }; + + { + let mut attr_str_buckets = [ + &mut res.attr_str_0, + &mut res.attr_str_1, + &mut res.attr_str_2, + &mut res.attr_str_3, + &mut res.attr_str_4, + &mut res.attr_str_5, + &mut res.attr_str_6, + &mut res.attr_str_7, + &mut res.attr_str_8, + &mut res.attr_str_9, + &mut res.attr_str_10, + &mut res.attr_str_11, + &mut res.attr_str_12, + &mut res.attr_str_13, + &mut res.attr_str_14, + &mut res.attr_str_15, + &mut res.attr_str_16, + &mut res.attr_str_17, + &mut res.attr_str_18, + &mut res.attr_str_19, + &mut res.attr_str_20, + &mut res.attr_str_21, + &mut res.attr_str_22, + &mut res.attr_str_23, + &mut res.attr_str_24, + &mut res.attr_str_25, + &mut res.attr_str_26, + &mut res.attr_str_27, + &mut res.attr_str_28, + &mut res.attr_str_29, + &mut res.attr_str_30, + &mut res.attr_str_31, + &mut res.attr_str_32, + &mut res.attr_str_33, + &mut res.attr_str_34, + &mut res.attr_str_35, + &mut res.attr_str_36, + &mut res.attr_str_37, + &mut res.attr_str_38, + &mut res.attr_str_39, + &mut res.attr_str_40, + &mut res.attr_str_41, + &mut res.attr_str_42, + &mut res.attr_str_43, + &mut res.attr_str_44, + &mut res.attr_str_45, + &mut res.attr_str_46, + &mut res.attr_str_47, + &mut res.attr_str_48, + &mut res.attr_str_49, + ]; + + sentry_tags.iter().chain(tags.iter()).for_each(|(k, v)| { + attr_str_buckets[(half_md5(k.as_bytes()) as usize) % attr_str_buckets.len()] + .insert(k.clone(), v.clone()); + }); + } + + { + let mut attr_num_buckets = [ + &mut res.attr_num_0, + &mut res.attr_num_1, + &mut res.attr_num_2, + &mut res.attr_num_3, + &mut res.attr_num_4, + &mut res.attr_num_5, + &mut res.attr_num_6, + &mut res.attr_num_7, + &mut res.attr_num_8, + &mut res.attr_num_9, + &mut res.attr_num_10, + &mut res.attr_num_11, + &mut res.attr_num_12, + &mut res.attr_num_13, + &mut res.attr_num_14, + &mut res.attr_num_15, + &mut res.attr_num_16, + &mut res.attr_num_17, + &mut res.attr_num_18, + &mut res.attr_num_19, + &mut res.attr_num_20, + &mut res.attr_num_21, + &mut res.attr_num_22, + &mut res.attr_num_23, + &mut res.attr_num_24, + &mut res.attr_num_25, + &mut res.attr_num_26, + &mut res.attr_num_27, + &mut res.attr_num_28, + &mut res.attr_num_29, + &mut res.attr_num_30, + &mut res.attr_num_31, + &mut res.attr_num_32, + &mut res.attr_num_33, + &mut res.attr_num_34, + &mut res.attr_num_35, + &mut res.attr_num_36, + &mut res.attr_num_37, + &mut res.attr_num_38, + &mut res.attr_num_39, + &mut res.attr_num_40, + &mut res.attr_num_41, + &mut res.attr_num_42, + &mut res.attr_num_43, + &mut res.attr_num_44, + &mut res.attr_num_45, + &mut res.attr_num_46, + &mut res.attr_num_47, + &mut res.attr_num_48, + &mut res.attr_num_49, + ]; + + measurements.iter().for_each(|(k, v)| { + attr_num_buckets[(half_md5(k.as_bytes()) as usize) % attr_num_buckets.len()] + .insert(k.clone(), v.value); + }); + } + + res + } +} + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use super::*; + + const SPAN_KAFKA_MESSAGE: &str = r#" +{ + "description": "/api/0/relays/projectconfigs/", + "duration_ms": 152, + "event_id": "d826225de75d42d6b2f01b957d51f18f", + "exclusive_time_ms": 0.228, + "is_segment": true, + "data": { + "sentry.environment": "development", + "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "thread.name": "uWSGIWorker1Core0", + "thread.id": "8522009600", + "sentry.segment.name": "/api/0/relays/projectconfigs/", + "sentry.sdk.name": "sentry.python.django", + "sentry.sdk.version": "2.7.0" + }, + "measurements": { + "num_of_spans": { + "value": 50.0 + } + }, + "organization_id": 1, + "origin": "auto.http.django", + "project_id": 1, + "received": 1721319572.877828, + "retention_days": 90, + "segment_id": "8873a98879faf06d", + "sentry_tags": { + "category": "http", + "environment": "development", + "op": "http.server", + "platform": "python", + "release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "sdk.name": "sentry.python.django", + "sdk.version": "2.7.0", + "status": "ok", + "status_code": "200", + "thread.id": "8522009600", + "thread.name": "uWSGIWorker1Core0", + "trace.status": "ok", + "transaction": "/api/0/relays/projectconfigs/", + "transaction.method": "POST", + "transaction.op": "http.server", + "user": "ip:127.0.0.1" + }, + "span_id": "8873a98879faf06d", + "tags": { + "http.status_code": "200", + "relay_endpoint_version": "3", + "relay_id": "88888888-4444-4444-8444-cccccccccccc", + "relay_no_cache": "False", + "relay_protocol_version": "3", + "relay_use_post_or_schedule": "True", + "relay_use_post_or_schedule_rejected": "version", + "server_name": "D23CXQ4GK2.local", + "spans_over_limit": "False" + }, + "trace_id": "d099bf9ad5a143cf8f83a98081d0ed3b", + "start_timestamp_ms": 1721319572616, + "start_timestamp_precise": 1721319572.616648, + "end_timestamp_precise": 1721319572.768806 +} + "#; + + #[test] + fn test_half_md5() { + //select halfMD5('test') == 688887797400064883 + assert_eq!(half_md5("test".as_bytes()), 688887797400064883) + } + + #[test] + fn test_valid_span() { + let payload = KafkaPayload::new(None, None, Some(SPAN_KAFKA_MESSAGE.as_bytes().to_vec())); + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + process_message(payload, meta, &ProcessorConfig::default()) + .expect("The message should be processed"); + } + + #[test] + fn test_serialization() { + let msg: FromSpanMessage = serde_json::from_slice(SPAN_KAFKA_MESSAGE.as_bytes()).unwrap(); + let span: EAPSpan = msg.try_into().unwrap(); + insta::with_settings!({sort_maps => true}, { + insta::assert_json_snapshot!(span) + }); + } +} diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index ce0e30328a..919da00cb5 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -1,3 +1,4 @@ +mod eap_spans; mod errors; mod functions; mod generic_metrics; @@ -55,6 +56,7 @@ define_processing_functions! { ("QuerylogProcessor", "snuba-queries", ProcessingFunctionType::ProcessingFunction(querylog::process_message)), ("ReplaysProcessor", "ingest-replay-events", ProcessingFunctionType::ProcessingFunction(replays::process_message)), ("SpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(spans::process_message)), + ("EAPSpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(eap_spans::process_message)), ("MetricsSummariesMessageProcessor", "snuba-metrics-summaries", ProcessingFunctionType::ProcessingFunction(metrics_summaries::process_message)), ("OutcomesProcessor", "outcomes", ProcessingFunctionType::ProcessingFunction(outcomes::process_message)), ("GenericCountersMetricsProcessor", "snuba-generic-metrics", ProcessingFunctionType::ProcessingFunction(generic_metrics::process_counter_message)), diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap new file mode 100644 index 0000000000..16cc974b51 --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap @@ -0,0 +1,90 @@ +--- +source: src/processors/eap_spans.rs +expression: span +--- +{ + "organization_id": 1, + "project_id": 1, + "trace_id": "d099bf9a-d5a1-43cf-8f83-a98081d0ed3b", + "span_id": 9832388815107059821, + "parent_span_id": 0, + "segment_id": 9832388815107059821, + "segment_name": "/api/0/relays/projectconfigs/", + "is_segment": true, + "_sort_timestamp": 1721319572, + "start_timestamp": 1721319572616, + "end_timestamp": 1721319572768, + "duration_ms": 152, + "exclusive_time_ms": 0.228, + "retention_days": 90, + "name": "/api/0/relays/projectconfigs/", + "sampling_factor": 1.0, + "sampling_weight": 1.0, + "sign": 1, + "attr_str_3": { + "server_name": "D23CXQ4GK2.local" + }, + "attr_str_7": { + "relay_use_post_or_schedule_rejected": "version" + }, + "attr_str_10": { + "transaction": "/api/0/relays/projectconfigs/" + }, + "attr_str_11": { + "thread.id": "8522009600" + }, + "attr_str_12": { + "relay_use_post_or_schedule": "True" + }, + "attr_str_17": { + "release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b" + }, + "attr_str_19": { + "environment": "development", + "platform": "python" + }, + "attr_str_21": { + "relay_endpoint_version": "3" + }, + "attr_str_23": { + "status_code": "200" + }, + "attr_str_24": { + "http.status_code": "200" + }, + "attr_str_25": { + "user": "ip:127.0.0.1" + }, + "attr_str_30": { + "sdk.version": "2.7.0", + "trace.status": "ok" + }, + "attr_str_31": { + "category": "http", + "thread.name": "uWSGIWorker1Core0" + }, + "attr_str_33": { + "transaction.method": "POST" + }, + "attr_str_34": { + "relay_id": "88888888-4444-4444-8444-cccccccccccc", + "spans_over_limit": "False", + "status": "ok" + }, + "attr_str_38": { + "relay_no_cache": "False", + "sdk.name": "sentry.python.django" + }, + "attr_str_40": { + "op": "http.server" + }, + "attr_str_48": { + "transaction.op": "http.server" + }, + "attr_str_49": { + "relay_protocol_version": "3" + }, + "attr_num_17": { + "num_of_spans": 50.0 + } +} diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap new file mode 100644 index 0000000000..5b719ba494 --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-spans-EAPSpansMessageProcessor-snuba-spans__1__basic_span.json.snap @@ -0,0 +1,73 @@ +--- +source: src/processors/mod.rs +description: "{\n \"event_id\": \"dcc403b73ef548648188bbfa6012e9dc\",\n \"organization_id\": 69,\n \"project_id\": 1,\n \"trace_id\": \"deadbeefdeadbeefdeadbeefdeadbeef\",\n \"span_id\": \"deadbeefdeadbeef\",\n \"parent_span_id\": \"deadbeefdeadbeef\",\n \"segment_id\": \"deadbeefdeadbeef\",\n \"duration_ms\": 1000,\n \"exclusive_time_ms\": 1000,\n \"is_segment\": false,\n \"profile_id\": \"deadbeefdeadbeefdeadbeefdeadbeef\",\n \"received\": 1715868485.381,\n \"retention_days\": 90,\n \"start_timestamp_ms\": 1715868485371,\n \"start_timestamp_precise\": 1715868485.370551,\n \"end_timestamp_precise\": 1715868486.370551,\n \"tags\": {\n \"tag1\": \"value1\",\n \"tag2\": \"123\",\n \"tag3\": \"True\"\n },\n \"sentry_tags\": {\n \"http.method\": \"GET\",\n \"action\": \"GET\",\n \"domain\": \"targetdomain.tld:targetport\",\n \"module\": \"http\",\n \"group\": \"deadbeefdeadbeef\",\n \"status\": \"ok\",\n \"system\": \"python\",\n \"status_code\": \"200\",\n \"transaction\": \"/organizations/:orgId/issues/\",\n \"transaction.op\": \"navigation\",\n \"op\": \"http.client\",\n \"transaction.method\": \"GET\"\n },\n \"measurements\": {\n \"http.response_content_length\": {\n \"value\": 100.0,\n \"unit\": \"byte\"\n }\n },\n \"_metrics_summary\": {\n \"c:sentry.events.outcomes@none\": [\n {\n \"count\": 1,\n \"max\": 1.0,\n \"min\": 1.0,\n \"sum\": 1.0,\n \"tags\": {\n \"category\": \"error\",\n \"environment\": \"unknown\",\n \"event_type\": \"error\",\n \"outcome\": \"accepted\",\n \"release\": \"backend@2af74c237fbd61489a1ccc46650f4f85befaf8b8\",\n \"topic\": \"outcomes-billing\",\n \"transaction\": \"sentry.tasks.store.save_event\"\n }\n }\n ],\n \"c:sentry.events.post_save.normalize.errors@none\": [\n {\n \"count\": 1,\n \"max\": 0.0,\n \"min\": 0.0,\n \"sum\": 0.0,\n \"tags\": {\n \"environment\": \"unknown\",\n \"event_type\": \"error\",\n \"from_relay\": \"False\",\n \"release\": \"backend@2af74c237fbd61489a1ccc46650f4f85befaf8b8\",\n \"transaction\": \"sentry.tasks.store.save_event\"\n }\n }\n ]\n }\n}\n" +expression: snapshot_payload +--- +[ + { + "_sort_timestamp": 1715868485, + "attr_num_12": { + "http.response_content_length": 100.0 + }, + "attr_str_1": { + "tag1": "value1", + "tag3": "True" + }, + "attr_str_10": { + "transaction": "/organizations/:orgId/issues/" + }, + "attr_str_12": { + "system": "python" + }, + "attr_str_16": { + "domain": "targetdomain.tld:targetport" + }, + "attr_str_17": { + "tag2": "123" + }, + "attr_str_21": { + "http.method": "GET" + }, + "attr_str_23": { + "status_code": "200" + }, + "attr_str_30": { + "group": "deadbeefdeadbeef" + }, + "attr_str_32": { + "module": "http" + }, + "attr_str_33": { + "transaction.method": "GET" + }, + "attr_str_34": { + "status": "ok" + }, + "attr_str_40": { + "op": "http.client" + }, + "attr_str_48": { + "transaction.op": "navigation" + }, + "attr_str_7": { + "action": "GET" + }, + "duration_ms": 1000, + "end_timestamp": 1715868486371, + "exclusive_time_ms": 1000.0, + "is_segment": false, + "name": "", + "organization_id": 69, + "parent_span_id": 16045690984833335023, + "project_id": 1, + "retention_days": 90, + "sampling_factor": 1.0, + "sampling_weight": 1.0, + "segment_id": 16045690984833335023, + "segment_name": "/organizations/:orgId/issues/", + "sign": 1, + "span_id": 16045690984833335023, + "start_timestamp": 1715868485371, + "trace_id": "deadbeef-dead-beef-dead-beefdeadbeef" + } +] diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__snuba-spans-.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__snuba-spans-.snap new file mode 100644 index 0000000000..5f85557705 --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__snuba-spans-.snap @@ -0,0 +1,18 @@ +--- +source: src/processors/mod.rs +expression: diff +--- +[ + Change { + path: "", + change: RequiredAdd { + property: "end_timestamp_precise", + }, + }, + Change { + path: "", + change: RequiredAdd { + property: "organization_id", + }, + }, +] diff --git a/rust_snuba/src/processors/spans.rs b/rust_snuba/src/processors/spans.rs index f74ab3462c..1f41fafbc8 100644 --- a/rust_snuba/src/processors/spans.rs +++ b/rust_snuba/src/processors/spans.rs @@ -32,33 +32,33 @@ pub fn process_message( } #[derive(Debug, Default, Deserialize, JsonSchema)] -struct FromSpanMessage { - description: Option, - duration_ms: u32, - #[serde(alias = "end_timestamp_micro")] - end_timestamp_precise: Option, - event_id: Option, - exclusive_time_ms: f64, - is_segment: bool, - measurements: Option>, - parent_span_id: Option, - profile_id: Option, - project_id: u64, - received: f64, - retention_days: Option, - segment_id: Option, - sentry_tags: Option>, - span_id: String, +pub(crate) struct FromSpanMessage { + pub(crate) description: Option, + pub(crate) duration_ms: u32, + pub(crate) end_timestamp_precise: f64, + pub(crate) event_id: Option, + pub(crate) exclusive_time_ms: f64, + pub(crate) is_segment: bool, + pub(crate) measurements: Option>, + pub(crate) parent_span_id: Option, + pub(crate) profile_id: Option, + pub(crate) organization_id: u64, + pub(crate) project_id: u64, + pub(crate) received: f64, + pub(crate) retention_days: Option, + pub(crate) segment_id: Option, + pub(crate) sentry_tags: Option>, + pub(crate) span_id: String, #[serde(alias = "start_timestamp_micro")] - start_timestamp_precise: Option, - start_timestamp_ms: u64, - tags: Option>, - trace_id: Uuid, + pub(crate) start_timestamp_precise: Option, + pub(crate) start_timestamp_ms: u64, + pub(crate) tags: Option>, + pub(crate) trace_id: Uuid, } #[derive(Debug, Default, Deserialize, JsonSchema)] -struct FromMeasurementValue { - value: f64, +pub(crate) struct FromMeasurementValue { + pub(crate) value: f64, } #[derive(Debug, Default, Serialize)] @@ -153,7 +153,7 @@ impl TryFrom for Span { duration: from.duration_ms, end_ms: (end_timestamp_ms % 1000) as u16, end_timestamp: end_timestamp_ms / 1000, - end_timestamp_precise: (from.end_timestamp_precise.unwrap_or_default() * 1e6) as u64, + end_timestamp_precise: (from.end_timestamp_precise * 1e6) as u64, exclusive_time: from.exclusive_time_ms, group, is_segment: if from.is_segment { 1 } else { 0 }, @@ -344,6 +344,7 @@ mod tests { parent_span_id: Option, profile_id: Option, project_id: Option, + organization_id: Option, received: Option, retention_days: Option, segment_id: Option, @@ -366,6 +367,7 @@ mod tests { parent_span_id: Some("deadbeefdeadbeef".into()), profile_id: Some(Uuid::new_v4()), project_id: Some(1), + organization_id: Some(1), retention_days: Some(90), received: Some(1691105878.720), segment_id: Some("deadbeefdeadbeef".into()), diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index 9264129baf..ac8fc73c74 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -107,6 +107,17 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: *COMMON_RUST_CONSUMER_DEV_OPTIONS, ], ), + ( + "eap-spans-consumer", + [ + "snuba", + "rust-consumer", + "--storage=eap_spans", + "--consumer-group=eap_spans_group", + "--use-rust-processor", + *COMMON_RUST_CONSUMER_DEV_OPTIONS, + ], + ), ] if settings.SEPARATE_SCHEDULER_EXECUTOR_SUBSCRIPTIONS_DEV: diff --git a/snuba/datasets/processors/spans_v2_processor.py b/snuba/datasets/processors/spans_v2_processor.py new file mode 100644 index 0000000000..8350bbb486 --- /dev/null +++ b/snuba/datasets/processors/spans_v2_processor.py @@ -0,0 +1,6 @@ +from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor + + +class EAPSpansMessageProcessor(RustCompatProcessor): + def __init__(self) -> None: + super().__init__("EAPSpansMessageProcessor") diff --git a/tests/datasets/test_spans_payloads.py b/tests/datasets/test_spans_payloads.py index bd6d100aae..7578dc9aa6 100644 --- a/tests/datasets/test_spans_payloads.py +++ b/tests/datasets/test_spans_payloads.py @@ -30,6 +30,7 @@ "exclusive_time_ms": 1234567890123, "is_segment": True, "project_id": project_id, + "organization_id": 1, "received": received, "retention_days": 90, "segment_id": "1234567890123456", diff --git a/tests/datasets/test_spans_processor.py b/tests/datasets/test_spans_processor.py index 8b4a4b054d..1e1e1da964 100644 --- a/tests/datasets/test_spans_processor.py +++ b/tests/datasets/test_spans_processor.py @@ -52,6 +52,7 @@ def serialize(self) -> SpanEvent: "is_segment": False, "parent_span_id": self.parent_span_id, "project_id": self.project_id, + "organization_id": 1, "received": self.received, "retention_days": self.retention_days, "segment_id": self.segment_id, diff --git a/tests/test_spans_api.py b/tests/test_spans_api.py index 3c243b3b28..c6b2f08fc9 100644 --- a/tests/test_spans_api.py +++ b/tests/test_spans_api.py @@ -83,6 +83,7 @@ def generate_fizzbuzz_events(self) -> None: .process_message( { "project_id": p, + "organization_id": 1, "event_id": uuid.uuid4().hex, "deleted": 0, "is_segment": False,