diff --git a/rust_snuba/src/processors/replays.rs b/rust_snuba/src/processors/replays.rs index aef25621a3..56436c3f23 100644 --- a/rust_snuba/src/processors/replays.rs +++ b/rust_snuba/src/processors/replays.rs @@ -13,36 +13,49 @@ pub fn process_message( _config: &ProcessorConfig, ) -> anyhow::Result { let payload_bytes = payload.payload().context("Expected payload")?; + let replay_row = deserialize_message(payload_bytes, metadata.partition, metadata.offset)?; + InsertBatch::from_rows(replay_row) +} - let replay_message: ReplayMessage = serde_json::from_slice(payload_bytes)?; +pub fn deserialize_message( + payload: &[u8], + partition: u16, + offset: u64, +) -> anyhow::Result> { + let replay_message: ReplayMessage = serde_json::from_slice(payload)?; let replay_payload = serde_json::from_slice(&replay_message.payload)?; match replay_payload { - ReplayPayload::ClickEvent(event) => { - InsertBatch::from_rows(event.clicks.into_iter().map(|click| ReplayRow { + ReplayPayload::ClickEvent(event) => Ok(event + .clicks + .into_iter() + .map(|click| ReplayRow { click_alt: click.alt, click_aria_label: click.aria_label, click_class: click.class, + click_component_name: click.component_name, click_id: click.id, click_is_dead: click.is_dead, click_is_rage: click.is_rage, click_node_id: click.node_id, - click_component_name: click.component_name, click_role: click.role, click_tag: click.tag, click_testid: click.testid, click_text: click.text, click_title: click.title, + error_sample_rate: -1.0, event_hash: click.event_hash, - offset: metadata.offset, - partition: metadata.partition, + offset, + partition, + platform: "javascript".to_string(), project_id: replay_message.project_id, replay_id: replay_message.replay_id, retention_days: replay_message.retention_days, + session_sample_rate: -1.0, timestamp: click.timestamp as u32, ..Default::default() - })) - } + }) + .collect()), ReplayPayload::Event(event) => { let event_hash = match (event.event_hash, event.segment_id) { (None, None) => Uuid::new_v4(), @@ -52,12 +65,6 @@ pub fn process_message( (Some(event_hash), _) => event_hash, }; - let (ip_address_v4, ip_address_v6) = match event.user.ip_address { - None => (None, None), - Some(IpAddr::V4(ip)) => (Some(ip), None), - Some(IpAddr::V6(ip)) => (None, Some(ip)), - }; - // Tags normalization and title extraction. let mut title = None; let mut tags_key = Vec::with_capacity(event.tags.len()); @@ -71,60 +78,71 @@ pub fn process_message( tags_value.push(tag.1); } - // Compute user value. - let user = if !event.user.user_id.is_empty() { - event.user.user_id.clone() - } else if !event.user.username.is_empty() { - event.user.username.clone() - } else if !event.user.email.is_empty() { - event.user.email.clone() - } else if let Some(ip) = event.user.ip_address { - ip.to_string() - } else { - String::new() + // Unwrap the ip-address string. + let ip_address_string = event.user.ip_address.unwrap_or_default(); + let (ip_address_v4, ip_address_v6) = match ip_address_string.parse::() { + Err(_) => (None, None), + Ok(IpAddr::V4(ipv4)) => (Some(ipv4), None), + Ok(IpAddr::V6(ipv6)) => (None, Some(ipv6)), }; + // Handle user-id field. + let user_id = match &event.user.user_id { + Some(UserId::String(v)) => Some(v.clone()), + Some(UserId::Number(v)) => Some(v.to_string()), + None => None, + }; + + // Handle user field. + let user = user_id + .clone() + .or(event.user.username.clone()) + .or(event.user.email.clone()) + .or(ip_address_v4.as_ref().map(|v| v.to_string())) + .or(ip_address_v6.as_ref().map(|v| v.to_string())) + .unwrap_or_default(); + // Sample-rate normalization. Null values are inserted as a -1.0 sentinel value. let error_sample_rate = event.contexts.replay.error_sample_rate.unwrap_or(-1.0); let session_sample_rate = event.contexts.replay.session_sample_rate.unwrap_or(-1.0); - let replay_row = ReplayRow { - browser_name: event.contexts.browser.name, - browser_version: event.contexts.browser.version, - device_brand: event.contexts.device.brand, - device_family: event.contexts.device.family, - device_model: event.contexts.device.model, - device_name: event.contexts.device.name, - dist: event.dist, - environment: event.environment, - error_ids: event.error_ids, + Ok(vec![ReplayRow { + browser_name: event.contexts.browser.name.unwrap_or_default(), + browser_version: event.contexts.browser.version.unwrap_or_default(), + device_brand: event.contexts.device.brand.unwrap_or_default(), + device_family: event.contexts.device.family.unwrap_or_default(), + device_model: event.contexts.device.model.unwrap_or_default(), + device_name: event.contexts.device.name.unwrap_or_default(), + dist: event.dist.unwrap_or_default(), + environment: event.environment.unwrap_or_default(), + error_ids: event.error_ids.unwrap_or_default(), error_sample_rate, session_sample_rate, event_hash, - is_archived: event.is_archived.into(), + is_archived: event.is_archived.unwrap_or_default().into(), ip_address_v4, ip_address_v6, - offset: metadata.offset, - os_name: event.contexts.os.name, - os_version: event.contexts.os.version, - partition: metadata.partition, - platform: event.platform, + offset, + os_name: event.contexts.os.name.unwrap_or_default(), + os_version: event.contexts.os.version.unwrap_or_default(), + partition, + platform: event.platform.unwrap_or("javascript".to_string()), project_id: replay_message.project_id, - release: event.release, + release: event.release.unwrap_or_default(), replay_id: event.replay_id, replay_start_timestamp: event.replay_start_timestamp.map(|v| v as u32), - replay_type: event.replay_type, + replay_type: event.replay_type.unwrap_or_default(), retention_days: replay_message.retention_days, - sdk_name: event.sdk.name, - sdk_version: event.sdk.version, + sdk_name: event.sdk.name.unwrap_or_default(), + sdk_version: event.sdk.version.unwrap_or_default(), segment_id: event.segment_id, timestamp: event.timestamp as u32, - trace_ids: event.trace_ids, - urls: event.urls, + trace_ids: event.trace_ids.unwrap_or_default(), + urls: event.urls.unwrap_or_default(), user, - user_email: event.user.email, - user_id: event.user.user_id, - user_name: event.user.username, + user_email: event.user.email.unwrap_or_default(), + user_id: user_id.unwrap_or_default(), + user_name: event.user.username.unwrap_or_default(), title, tags_key, tags_value: tags_value @@ -132,29 +150,26 @@ pub fn process_message( .map(|s| s.unwrap_or_default()) .collect(), ..Default::default() - }; - - InsertBatch::from_rows([replay_row]) - } - ReplayPayload::EventLinkEvent(event) => { - let replay_row = ReplayRow { - debug_id: event.debug_id, - error_id: event.error_id, - event_hash: event.event_hash, - fatal_id: event.fatal_id, - info_id: event.info_id, - offset: metadata.offset, - partition: metadata.partition, - project_id: replay_message.project_id, - replay_id: replay_message.replay_id, - retention_days: replay_message.retention_days, - timestamp: event.timestamp as u32, - warning_id: event.warning_id, - ..Default::default() - }; - - InsertBatch::from_rows([replay_row]) + }]) } + ReplayPayload::EventLinkEvent(event) => Ok(vec![ReplayRow { + debug_id: event.debug_id, + error_id: event.error_id, + error_sample_rate: -1.0, + event_hash: event.event_hash, + fatal_id: event.fatal_id, + info_id: event.info_id, + offset, + partition, + platform: "javascript".to_string(), + project_id: replay_message.project_id, + replay_id: replay_message.replay_id, + retention_days: replay_message.retention_days, + session_sample_rate: -1.0, + timestamp: event.timestamp as u32, + warning_id: event.warning_id, + ..Default::default() + }]), } } @@ -206,39 +221,47 @@ struct ReplayClickEventClick { // Replay Event +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum UserId { + String(String), + Number(u64), +} + #[derive(Debug, Deserialize)] struct ReplayEvent { replay_id: Uuid, #[serde(default)] contexts: Contexts, #[serde(default)] - dist: String, + dist: Option, #[serde(default)] - environment: String, + environment: Option, #[serde(default)] event_hash: Option, #[serde(default)] - is_archived: bool, - #[serde(default = "default_platform")] - platform: String, + is_archived: Option, #[serde(default)] - release: String, + platform: Option, + #[serde(default)] + release: Option, #[serde(default)] replay_start_timestamp: Option, #[serde(default)] - replay_type: String, + replay_type: Option, #[serde(default)] sdk: Version, + #[serde(default)] segment_id: Option, timestamp: f64, #[serde(default)] - urls: Vec, + urls: Option>, #[serde(default)] user: User, #[serde(default)] - trace_ids: Vec, + trace_ids: Option>, #[serde(default)] - error_ids: Vec, + error_ids: Option>, #[serde(default)] tags: Vec<(String, Option)>, } @@ -258,13 +281,13 @@ struct Contexts { #[derive(Debug, Default, Deserialize)] struct Device { #[serde(default)] - brand: String, + brand: Option, #[serde(default)] - family: String, + family: Option, #[serde(default)] - model: String, + model: Option, #[serde(default)] - name: String, + name: Option, } #[derive(Debug, Default, Deserialize)] @@ -278,25 +301,21 @@ struct ReplayContext { #[derive(Debug, Default, Deserialize)] struct User { #[serde(default)] - username: String, + username: Option, #[serde(default)] - user_id: String, + user_id: Option, #[serde(default)] - email: String, + email: Option, #[serde(default)] - ip_address: Option, + ip_address: Option, } #[derive(Debug, Default, Deserialize)] struct Version { #[serde(default)] - name: String, + name: Option, #[serde(default)] - version: String, -} - -fn default_platform() -> String { - "javascript".to_string() + version: Option, } // ReplayEventLink @@ -322,94 +341,66 @@ struct ReplayEventLinkEvent { // their null condition dropped. #[derive(Debug, Default, Serialize)] -struct ReplayRow { - replay_id: Uuid, - #[serde(skip_serializing_if = "uuid::Uuid::is_nil")] +pub struct ReplayRow { + browser_name: String, + browser_version: String, + click_alt: String, + click_aria_label: String, + click_class: Vec, + click_component_name: String, + click_id: String, + click_is_dead: u8, + click_is_rage: u8, + click_node_id: u32, + click_role: String, + click_tag: String, + click_testid: String, + click_text: String, + click_title: String, debug_id: Uuid, - #[serde(skip_serializing_if = "uuid::Uuid::is_nil")] - info_id: Uuid, - #[serde(skip_serializing_if = "uuid::Uuid::is_nil")] - warning_id: Uuid, - #[serde(skip_serializing_if = "uuid::Uuid::is_nil")] + device_brand: String, + device_family: String, + device_model: String, + device_name: String, + dist: String, + environment: String, error_id: Uuid, - #[serde(skip_serializing_if = "uuid::Uuid::is_nil")] - fatal_id: Uuid, - replay_type: String, + error_ids: Vec, error_sample_rate: f64, - session_sample_rate: f64, event_hash: Uuid, - segment_id: Option, - trace_ids: Vec, - title: Option, - urls: Vec, - is_archived: u8, - error_ids: Vec, - project_id: u64, - timestamp: u32, - replay_start_timestamp: Option, - platform: String, - environment: String, - release: String, - dist: String, - #[serde(skip_serializing_if = "Option::is_none")] + fatal_id: Uuid, + info_id: Uuid, ip_address_v4: Option, - #[serde(skip_serializing_if = "Option::is_none")] ip_address_v6: Option, - user: String, - user_id: String, - user_name: String, - user_email: String, + is_archived: u8, + offset: u64, os_name: String, os_version: String, - browser_name: String, - browser_version: String, - device_name: String, - device_brand: String, - device_family: String, - device_model: String, + partition: u16, + platform: String, + project_id: u64, + release: String, + replay_id: Uuid, + replay_start_timestamp: Option, + replay_type: String, + retention_days: u16, sdk_name: String, sdk_version: String, + segment_id: Option, + session_sample_rate: f64, #[serde(rename = "tags.key")] tags_key: Vec, #[serde(rename = "tags.value")] tags_value: Vec, - #[serde(skip_serializing_if = "is_u32_zero")] - click_node_id: u32, - #[serde(skip_serializing_if = "String::is_empty")] - click_tag: String, - #[serde(skip_serializing_if = "String::is_empty")] - click_id: String, - #[serde(skip_serializing_if = "Vec::is_empty")] - click_class: Vec, - #[serde(skip_serializing_if = "String::is_empty")] - click_text: String, - #[serde(skip_serializing_if = "String::is_empty")] - click_role: String, - #[serde(skip_serializing_if = "String::is_empty")] - click_alt: String, - #[serde(skip_serializing_if = "String::is_empty")] - click_testid: String, - #[serde(skip_serializing_if = "String::is_empty")] - click_aria_label: String, - #[serde(skip_serializing_if = "String::is_empty")] - click_title: String, - #[serde(skip_serializing_if = "String::is_empty")] - click_component_name: String, - #[serde(skip_serializing_if = "is_u8_zero")] - click_is_dead: u8, - #[serde(skip_serializing_if = "is_u8_zero")] - click_is_rage: u8, - retention_days: u16, - partition: u16, - offset: u64, -} - -fn is_u8_zero(v: &u8) -> bool { - *v == 0 -} - -fn is_u32_zero(v: &u32) -> bool { - *v == 0 + timestamp: u32, + title: Option, + trace_ids: Vec, + urls: Vec, + user_email: String, + user_id: String, + user_name: String, + user: String, + warning_id: Uuid, } #[cfg(test)] @@ -417,7 +408,7 @@ mod tests { use super::*; use chrono::DateTime; use rust_arroyo::backends::kafka::types::KafkaPayload; - use std::time::SystemTime; + use std::{str::FromStr, time::SystemTime}; #[test] fn test_parse_replay_event() { @@ -425,7 +416,7 @@ mod tests { "contexts": { "browser": { "name": "browser", - "verison": "v1" + "version": "v1" }, "device": { "brand": "brand", @@ -435,7 +426,7 @@ mod tests { }, "os": { "name": "os", - "verison": "v1" + "version": "v1" }, "replay": { "error_sample_rate": 1, @@ -450,7 +441,7 @@ mod tests { }, "sdk": { "name": "sdk", - "verison": "v1" + "version": "v1" }, "dist": "dist", "environment": "environment", @@ -484,14 +475,206 @@ mod tests { "type": "replay_event" }}"# ); - let payload = KafkaPayload::new(None, None, Some(data.as_bytes().to_vec())); - let meta = KafkaMessageMetadata { - partition: 0, - offset: 1, - timestamp: DateTime::from(SystemTime::now()), - }; - process_message(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + + let rows = deserialize_message(data.as_bytes(), 0, 0).unwrap(); + let replay_row = rows.first().unwrap(); + + // Columns in the critical path. + assert_eq!(&replay_row.browser_name, "browser"); + assert_eq!(&replay_row.browser_version, "v1"); + assert_eq!(&replay_row.device_brand, "brand"); + assert_eq!(&replay_row.device_family, "family"); + assert_eq!(&replay_row.device_model, "model"); + assert_eq!(&replay_row.device_name, "name"); + assert_eq!(&replay_row.dist, "dist"); + assert_eq!(&replay_row.environment, "environment"); + assert_eq!(&replay_row.os_name, "os"); + assert_eq!(&replay_row.os_version, "v1"); + assert_eq!(&replay_row.platform, "platform"); + assert_eq!(&replay_row.release, "release"); + assert_eq!(&replay_row.replay_type, "buffer"); + assert_eq!(&replay_row.sdk_name, "sdk"); + assert_eq!(&replay_row.sdk_version, "v1"); + assert_eq!(&replay_row.user_email, "email"); + assert_eq!(&replay_row.user_id, "user_id"); + assert_eq!(&replay_row.user_name, "username"); + assert_eq!(&replay_row.user, "user_id"); + assert_eq!( + replay_row.error_ids, + vec![Uuid::parse_str("df11e6d952da470386a64340f13151c4").unwrap()] + ); + assert_eq!(replay_row.error_sample_rate, 1.0); + assert_eq!( + replay_row.ip_address_v4, + Some(Ipv4Addr::from_str("127.0.0.1").unwrap()) + ); + assert_eq!(replay_row.ip_address_v6, None); + assert_eq!(replay_row.is_archived, 0); + assert_eq!(replay_row.project_id, 1); + assert_eq!(replay_row.replay_start_timestamp, Some(1702659277)); + assert_eq!(replay_row.retention_days, 30); + assert_eq!( + &replay_row.replay_id, + &Uuid::parse_str("048aa04be40243948eb3b57089c519ee").unwrap() + ); + assert_eq!(replay_row.segment_id, Some(0)); + assert_eq!(replay_row.session_sample_rate, 0.5); + assert_eq!(replay_row.title, None); + assert_eq!( + replay_row.trace_ids, + vec![Uuid::parse_str("2cd798d70f9346089026d2014a826629").unwrap()] + ); + assert_eq!(replay_row.urls, vec!["urls"]); + + // Default columns - not providable on this event. + assert_eq!(&replay_row.click_alt, ""); + assert_eq!(&replay_row.click_aria_label, ""); + assert_eq!(&replay_row.click_component_name, ""); + assert_eq!(&replay_row.click_id, ""); + assert_eq!(&replay_row.click_role, ""); + assert_eq!(&replay_row.click_tag, ""); + assert_eq!(&replay_row.click_testid, ""); + assert_eq!(&replay_row.click_text, ""); + assert_eq!(&replay_row.click_title, ""); + assert_eq!(replay_row.click_class, Vec::::new()); + assert_eq!(replay_row.click_is_dead, 0); + assert_eq!(replay_row.click_is_rage, 0); + assert_eq!(replay_row.click_node_id, 0); + assert_eq!(replay_row.debug_id, Uuid::nil()); + assert_eq!(replay_row.error_id, Uuid::nil()); + assert_eq!(replay_row.fatal_id, Uuid::nil()); + assert_eq!(replay_row.info_id, Uuid::nil()); + assert_eq!(replay_row.warning_id, Uuid::nil()); + } + + #[test] + fn test_parse_replay_event_empty_set() { + // Test every field defaults to + let payload = r#"{ + "type": "replay_event", + "replay_id": "048aa04be40243948eb3b57089c519ee", + "replay_type": null, + "segment_id": null, + "event_hash": null, + "tags": [ + ["a", "b"], + ["transaction.name", null] + ], + "urls": null, + "is_archived": null, + "trace_ids": null, + "error_ids": null, + "dist": null, + "platform": null, + "timestamp": 1704763370, + "replay_start_timestamp": null, + "environment": null, + "release": null, + "user": { + "id": null, + "username": null, + "email": null, + "ip_address": null + }, + "sdk": { + "name": null, + "version": null + }, + "contexts": { + "replay": { + "error_sample_rate": null, + "session_sample_rate": null + }, + "os": { + "name": null, + "version": null + }, + "browser": { + "name": null, + "version": null + }, + "device": { + "name": null, + "brand": null, + "family": null, + "model": null + } + } + }"#; + let payload_value = payload.as_bytes(); + + let data = format!( + r#"{{ + "payload": {payload_value:?}, + "project_id": 1, + "replay_id": "048aa04be40243948eb3b57089c519ee", + "retention_days": 30, + "segment_id": null, + "start_time": 100, + "type": "replay_event" + }}"# + ); + + let rows = deserialize_message(data.as_bytes(), 0, 0).unwrap(); + let replay_row = rows.first().unwrap(); + + // Columns in the critical path. + assert_eq!(&replay_row.browser_name, ""); + assert_eq!(&replay_row.browser_version, ""); + assert_eq!(&replay_row.device_brand, ""); + assert_eq!(&replay_row.device_family, ""); + assert_eq!(&replay_row.device_model, ""); + assert_eq!(&replay_row.device_name, ""); + assert_eq!(&replay_row.dist, ""); + assert_eq!(&replay_row.environment, ""); + assert_eq!(&replay_row.os_name, ""); + assert_eq!(&replay_row.os_version, ""); + assert_eq!(&replay_row.release, ""); + assert_eq!(&replay_row.replay_type, ""); + assert_eq!(&replay_row.sdk_name, ""); + assert_eq!(&replay_row.sdk_version, ""); + assert_eq!(&replay_row.user_email, ""); + assert_eq!(&replay_row.user_id, ""); + assert_eq!(&replay_row.user_name, ""); + assert_eq!(&replay_row.user, ""); + assert_eq!(replay_row.error_ids, vec![]); + assert_eq!(replay_row.error_sample_rate, -1.0); + assert_eq!(replay_row.ip_address_v4, None); + assert_eq!(replay_row.ip_address_v6, None); + assert_eq!(replay_row.is_archived, 0); + assert_eq!(replay_row.platform, "javascript".to_string()); + assert_eq!(replay_row.project_id, 1); + assert_eq!(replay_row.replay_start_timestamp, None); + assert_eq!(replay_row.retention_days, 30); + assert_eq!( + &replay_row.replay_id, + &Uuid::parse_str("048aa04be40243948eb3b57089c519ee").unwrap() + ); + assert_eq!(replay_row.segment_id, None); + assert_eq!(replay_row.session_sample_rate, -1.0); + assert_eq!(replay_row.title, None); + assert_eq!(replay_row.trace_ids, vec![]); + assert_eq!(replay_row.urls, Vec::::new()); + + // Default columns - not providable on this event. + assert_eq!(&replay_row.click_alt, ""); + assert_eq!(&replay_row.click_aria_label, ""); + assert_eq!(&replay_row.click_component_name, ""); + assert_eq!(&replay_row.click_id, ""); + assert_eq!(&replay_row.click_role, ""); + assert_eq!(&replay_row.click_tag, ""); + assert_eq!(&replay_row.click_testid, ""); + assert_eq!(&replay_row.click_text, ""); + assert_eq!(&replay_row.click_title, ""); + assert_eq!(replay_row.click_class, Vec::::new()); + assert_eq!(replay_row.click_is_dead, 0); + assert_eq!(replay_row.click_is_rage, 0); + assert_eq!(replay_row.click_node_id, 0); + assert_eq!(replay_row.debug_id, Uuid::nil()); + assert_eq!(replay_row.error_id, Uuid::nil()); + assert_eq!(replay_row.fatal_id, Uuid::nil()); + assert_eq!(replay_row.info_id, Uuid::nil()); + assert_eq!(replay_row.warning_id, Uuid::nil()); } #[test] @@ -503,12 +686,12 @@ mod tests { "alt": "Alternate", "aria_label": "Aria-label", "class": ["hello", "world"], + "component_name": "SignUpButton", "event_hash": "b4370ef8d1994e96b5bc719b72afbf49", "id": "id", "is_dead": 0, "is_rage": 1, "node_id": 320, - "component_name": "SignUpButton", "role": "button", "tag": "div", "testid": "", @@ -530,14 +713,67 @@ mod tests { "type": "replay_event" }}"# ); - let payload = KafkaPayload::new(None, None, Some(data.as_bytes().to_vec())); - let meta = KafkaMessageMetadata { - partition: 0, - offset: 1, - timestamp: DateTime::from(SystemTime::now()), - }; - process_message(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + + let rows = deserialize_message(data.as_bytes(), 0, 0).unwrap(); + let replay_row = rows.first().unwrap(); + + // Columns in the critical path. + assert_eq!(&replay_row.click_alt, "Alternate"); + assert_eq!(&replay_row.click_aria_label, "Aria-label"); + assert_eq!(&replay_row.click_component_name, "SignUpButton"); + assert_eq!(&replay_row.click_id, "id"); + assert_eq!(&replay_row.click_role, "button"); + assert_eq!(&replay_row.click_tag, "div"); + assert_eq!(&replay_row.click_testid, ""); + assert_eq!(&replay_row.click_text, "Submit"); + assert_eq!(&replay_row.click_title, "title"); + assert_eq!(replay_row.click_class, ["hello", "world"]); + assert_eq!(replay_row.click_is_dead, 0); + assert_eq!(replay_row.click_is_rage, 1); + assert_eq!(replay_row.click_node_id, 320); + assert_eq!(replay_row.project_id, 1); + assert_eq!( + &replay_row.replay_id, + &Uuid::parse_str("048aa04be40243948eb3b57089c519ee").unwrap() + ); + assert_eq!(replay_row.retention_days, 30); + assert_eq!(replay_row.segment_id, None); + + // Default columns - not providable on this event. + assert_eq!(&replay_row.browser_name, ""); + assert_eq!(&replay_row.browser_version, ""); + assert_eq!(&replay_row.device_brand, ""); + assert_eq!(&replay_row.device_family, ""); + assert_eq!(&replay_row.device_model, ""); + assert_eq!(&replay_row.device_name, ""); + assert_eq!(&replay_row.dist, ""); + assert_eq!(&replay_row.environment, ""); + assert_eq!(&replay_row.os_name, ""); + assert_eq!(&replay_row.os_version, ""); + assert_eq!(&replay_row.release, ""); + assert_eq!(&replay_row.replay_type, ""); + assert_eq!(&replay_row.sdk_name, ""); + assert_eq!(&replay_row.sdk_version, ""); + assert_eq!(&replay_row.user_email, ""); + assert_eq!(&replay_row.user_id, ""); + assert_eq!(&replay_row.user_name, ""); + assert_eq!(&replay_row.user, ""); + assert_eq!(replay_row.debug_id, Uuid::nil()); + assert_eq!(replay_row.error_id, Uuid::nil()); + assert_eq!(replay_row.error_ids, vec![]); + assert_eq!(replay_row.error_sample_rate, -1.0); + assert_eq!(replay_row.fatal_id, Uuid::nil()); + assert_eq!(replay_row.info_id, Uuid::nil()); + assert_eq!(replay_row.ip_address_v4, None); + assert_eq!(replay_row.ip_address_v6, None); + assert_eq!(replay_row.is_archived, 0); + assert_eq!(replay_row.platform, "javascript".to_string()); + assert_eq!(replay_row.replay_start_timestamp, None); + assert_eq!(replay_row.session_sample_rate, -1.0); + assert_eq!(replay_row.title, None); + assert_eq!(replay_row.trace_ids, vec![]); + assert_eq!(replay_row.urls, Vec::::new()); + assert_eq!(replay_row.warning_id, Uuid::nil()); } #[test] @@ -562,14 +798,75 @@ mod tests { "type": "replay_event" }}"# ); - let payload = KafkaPayload::new(None, None, Some(data.as_bytes().to_vec())); - let meta = KafkaMessageMetadata { - partition: 0, - offset: 1, - timestamp: DateTime::from(SystemTime::now()), - }; - process_message(payload, meta, &ProcessorConfig::default()) - .expect("The message should be processed"); + + let rows = deserialize_message(data.as_bytes(), 0, 0).unwrap(); + let replay_row = rows.first().unwrap(); + + // Columns in the critical path. + assert_eq!( + replay_row.debug_id, + Uuid::parse_str("5d51f0eb1d1244e7a8312d8a248cd987").unwrap() + ); + assert_eq!(replay_row.info_id, Uuid::nil()); + assert_eq!(replay_row.error_id, Uuid::nil()); + assert_eq!(replay_row.fatal_id, Uuid::nil()); + assert_eq!(replay_row.warning_id, Uuid::nil()); + assert_eq!( + replay_row.event_hash, + Uuid::parse_str("b4370ef8d1994e96b5bc719b72afbf49").unwrap() + ); + assert_eq!( + replay_row.replay_id, + Uuid::parse_str("048aa04be40243948eb3b57089c519ee").unwrap() + ); + assert_eq!(replay_row.project_id, 1); + assert_eq!(replay_row.retention_days, 30); + assert_eq!(replay_row.segment_id, None); + assert_eq!(replay_row.timestamp, 1702659277); + + // Default columns - not providable on this event. + assert_eq!(&replay_row.browser_name, ""); + assert_eq!(&replay_row.browser_version, ""); + assert_eq!(&replay_row.click_alt, ""); + assert_eq!(&replay_row.click_aria_label, ""); + assert_eq!(&replay_row.click_component_name, ""); + assert_eq!(&replay_row.click_id, ""); + assert_eq!(&replay_row.click_role, ""); + assert_eq!(&replay_row.click_tag, ""); + assert_eq!(&replay_row.click_testid, ""); + assert_eq!(&replay_row.click_text, ""); + assert_eq!(&replay_row.click_title, ""); + assert_eq!(&replay_row.device_brand, ""); + assert_eq!(&replay_row.device_family, ""); + assert_eq!(&replay_row.device_model, ""); + assert_eq!(&replay_row.device_name, ""); + assert_eq!(&replay_row.dist, ""); + assert_eq!(&replay_row.environment, ""); + assert_eq!(&replay_row.os_name, ""); + assert_eq!(&replay_row.os_version, ""); + assert_eq!(&replay_row.release, ""); + assert_eq!(&replay_row.replay_type, ""); + assert_eq!(&replay_row.sdk_name, ""); + assert_eq!(&replay_row.sdk_version, ""); + assert_eq!(&replay_row.user_email, ""); + assert_eq!(&replay_row.user_id, ""); + assert_eq!(&replay_row.user_name, ""); + assert_eq!(&replay_row.user, ""); + assert_eq!(replay_row.click_class, Vec::::new()); + assert_eq!(replay_row.click_is_dead, 0); + assert_eq!(replay_row.click_is_rage, 0); + assert_eq!(replay_row.click_node_id, 0); + assert_eq!(replay_row.error_ids, vec![]); + assert_eq!(replay_row.error_sample_rate, -1.0); + assert_eq!(replay_row.ip_address_v4, None); + assert_eq!(replay_row.ip_address_v6, None); + assert_eq!(replay_row.is_archived, 0); + assert_eq!(replay_row.platform, "javascript".to_string()); + assert_eq!(replay_row.replay_start_timestamp, None); + assert_eq!(replay_row.session_sample_rate, -1.0); + assert_eq!(replay_row.title, None); + assert_eq!(replay_row.trace_ids, vec![]); + assert_eq!(replay_row.urls, Vec::::new()); } #[test] @@ -593,6 +890,92 @@ mod tests { "type": "replay_event" }}"# ); + + let rows = deserialize_message(data.as_bytes(), 0, 0).unwrap(); + let replay_row = rows.first().unwrap(); + + assert_eq!(replay_row.is_archived, 1); + assert_eq!(replay_row.project_id, 1); + assert_eq!(replay_row.retention_days, 30); + assert_eq!(replay_row.segment_id, None); + assert_eq!(replay_row.timestamp, 1702659277); + assert_eq!( + replay_row.replay_id, + Uuid::parse_str("048aa04be40243948eb3b57089c519ee").unwrap() + ); + + assert_eq!(&replay_row.browser_name, ""); + assert_eq!(&replay_row.browser_version, ""); + assert_eq!(&replay_row.click_alt, ""); + assert_eq!(&replay_row.click_aria_label, ""); + assert_eq!(&replay_row.click_component_name, ""); + assert_eq!(&replay_row.click_id, ""); + assert_eq!(&replay_row.click_role, ""); + assert_eq!(&replay_row.click_tag, ""); + assert_eq!(&replay_row.click_testid, ""); + assert_eq!(&replay_row.click_text, ""); + assert_eq!(&replay_row.click_title, ""); + assert_eq!(&replay_row.device_brand, ""); + assert_eq!(&replay_row.device_family, ""); + assert_eq!(&replay_row.device_model, ""); + assert_eq!(&replay_row.device_name, ""); + assert_eq!(&replay_row.dist, ""); + assert_eq!(&replay_row.environment, ""); + assert_eq!(&replay_row.os_name, ""); + assert_eq!(&replay_row.os_version, ""); + assert_eq!(&replay_row.release, ""); + assert_eq!(&replay_row.replay_type, ""); + assert_eq!(&replay_row.sdk_name, ""); + assert_eq!(&replay_row.sdk_version, ""); + assert_eq!(&replay_row.user_email, ""); + assert_eq!(&replay_row.user_id, ""); + assert_eq!(&replay_row.user_name, ""); + assert_eq!(&replay_row.user, ""); + assert_eq!(replay_row.click_class, Vec::::new()); + assert_eq!(replay_row.click_is_dead, 0); + assert_eq!(replay_row.click_is_rage, 0); + assert_eq!(replay_row.click_node_id, 0); + assert_eq!(replay_row.debug_id, Uuid::nil()); + assert_eq!(replay_row.error_id, Uuid::nil()); + assert_eq!(replay_row.error_ids, vec![]); + assert_eq!(replay_row.error_sample_rate, -1.0); + assert_eq!(replay_row.fatal_id, Uuid::nil()); + assert_eq!(replay_row.info_id, Uuid::nil()); + assert_eq!(replay_row.ip_address_v4, None); + assert_eq!(replay_row.ip_address_v6, None); + assert_eq!(replay_row.platform, "javascript".to_string()); + assert_eq!(replay_row.replay_start_timestamp, None); + assert_eq!(replay_row.session_sample_rate, -1.0); + assert_eq!(replay_row.title, None); + assert_eq!(replay_row.trace_ids, vec![]); + assert_eq!(replay_row.urls, Vec::::new()); + assert_eq!(replay_row.warning_id, Uuid::nil()); + } + + #[test] + fn test_e2e() { + // We're just testing the consumer interface works. The payload is irrelevant so + // long as it doesn't fail. + let payload = r#"{ + "is_archived": true, + "timestamp": 1702659277, + "type": "replay_event", + "replay_id": "048aa04be40243948eb3b57089c519ee" + }"#; + let payload_value = payload.as_bytes(); + + let data = format!( + r#"{{ + "payload": {payload_value:?}, + "project_id": 1, + "replay_id": "048aa04be40243948eb3b57089c519ee", + "retention_days": 30, + "segment_id": null, + "start_time": 100, + "type": "replay_event" + }}"# + ); + let payload = KafkaPayload::new(None, None, Some(data.as_bytes().to_vec())); let meta = KafkaMessageMetadata { partition: 0, diff --git a/snuba/datasets/processors/replays_processor.py b/snuba/datasets/processors/replays_processor.py index 03d8520db6..9990af8850 100644 --- a/snuba/datasets/processors/replays_processor.py +++ b/snuba/datasets/processors/replays_processor.py @@ -73,14 +73,16 @@ def _process_base_replay_event_values( str, maybe(to_string, replay_event.get("environment")) ) processed["dist"] = default(str, maybe(to_string, replay_event.get("dist"))) - processed["platform"] = default(str, maybe(to_string, replay_event["platform"])) + processed["platform"] = default( + str, maybe(to_string, replay_event.get("platform", "javascript")) + ) processed["replay_type"] = default( str, maybe( to_enum(["buffer", "session", "error"]), replay_event.get("replay_type") ), ) - processed["is_archived"] = default(int, replay_event.get("is_archived")) + processed["is_archived"] = int(default(int, replay_event.get("is_archived"))) def _process_tags( self, processed: MutableMapping[str, Any], replay_event: ReplayEventDict @@ -105,6 +107,8 @@ def _add_user_column( processed["user"] = user_data[field] return + processed["user"] = "" + def _process_user( self, processed: MutableMapping[str, Any], replay_event: ReplayEventDict ) -> None: @@ -188,7 +192,7 @@ def _process_event_hash( ) -> None: event_hash = replay_event.get("event_hash") if event_hash is None: - event_hash = segment_id_to_event_hash(replay_event["segment_id"]) + event_hash = segment_id_to_event_hash(replay_event.get("segment_id", None)) processed["event_hash"] = str(uuid.UUID(event_hash)) @@ -249,15 +253,15 @@ def process_replay_actions( ), "replay_id": to_uuid(payload["replay_id"]), "segment_id": None, - "event_hash": click["event_hash"], + "event_hash": str(uuid.UUID(click["event_hash"])), # Default values for non-nullable columns. "trace_ids": [], "error_ids": [], "urls": [], "platform": "javascript", - "user": None, - "sdk_name": None, - "sdk_version": None, + "user": "", + "sdk_name": "", + "sdk_version": "", # Kafka columns. "retention_days": processed["retention_days"], "partition": metadata.partition, @@ -303,7 +307,7 @@ def process_replay_event_link( "project_id": processed["project_id"], "replay_id": to_uuid(payload["replay_id"]), "segment_id": None, - "event_hash": payload["event_hash"], + "event_hash": str(uuid.UUID(payload["event_hash"])), "timestamp": raise_on_null( "timestamp", maybe(to_datetime, payload["timestamp"]) ), diff --git a/tests/consumers/test_message_processors.py b/tests/consumers/test_message_processors.py index 566022f056..84639b5dc9 100644 --- a/tests/consumers/test_message_processors.py +++ b/tests/consumers/test_message_processors.py @@ -12,18 +12,11 @@ from snuba.consumers.types import KafkaMessageMetadata from snuba.datasets.processors import DatasetMessageProcessor from snuba.datasets.processors.outcomes_processor import OutcomesProcessor +from snuba.datasets.processors.replays_processor import ReplaysProcessor from snuba.processor import InsertBatch -@pytest.mark.parametrize( - "topic,processor", - [ - # TODO(colton): See why the replays tests are failing with the latest - # examples from sentry-kafka-schemas. - # ("ingest-replay-events", ReplaysProcessor), - ("outcomes", OutcomesProcessor), - ], -) +@pytest.mark.parametrize("topic,processor", [("outcomes", OutcomesProcessor)]) def test_message_processors( topic: str, processor: Type[DatasetMessageProcessor] ) -> None: @@ -69,3 +62,68 @@ def test_message_processors( for line in rust_processed_message.rstrip(b"\n").split(b"\n") if line ] == python_processed_message.rows + + +def test_replays_message_processor() -> None: + """Tests the output of the Replay Python and Rust message processors is the same.""" + processor = ReplaysProcessor + topic = "ingest-replay-events" + + for ex in sentry_kafka_schemas.iter_examples(topic): + data_json = ex.load() + data_json["start_time"] = int(time.time()) + + data_bytes = json.dumps(data_json).encode("utf-8") + + processor_name = processor.__qualname__ + partition = 0 + offset = 1 + millis_since_epoch = int(time.time() * 1000) + + rust_processed_message = bytes( + rust_snuba.process_message( # type: ignore + processor_name, data_bytes, partition, offset, millis_since_epoch + ) + ) + python_processed_message = processor().process_message( + data_json, + KafkaMessageMetadata( + offset=offset, + partition=partition, + timestamp=datetime.utcfromtimestamp(millis_since_epoch / 1000), + ), + ) + + assert isinstance(python_processed_message, InsertBatch) + + for line in rust_processed_message.rstrip(b"\n").split(b"\n"): + if not line: + continue + + parsed_rust_message = json.loads(line) + parsed_python_message = python_processed_message.rows[0] + + # timestamp is sometimes in different formats so we'll coerce. + ts1 = parsed_rust_message.pop("timestamp", None) + ts2 = parsed_python_message.pop("timestamp", None) # type: ignore + if isinstance(ts2, datetime): + ts2 = int(ts2.timestamp()) + assert ts1 == ts2 + + # replay_start_timestamp is sometimes in different formats so we'll coerce. + sts1 = parsed_rust_message.pop("replay_start_timestamp", None) + sts2 = parsed_python_message.pop("replay_start_timestamp", None) # type: ignore + if isinstance(sts2, datetime): + sts2 = int(sts2.timestamp()) + assert sts1 == sts2 + + # event_hash is generated by the consumer and not always consistent if + # no segment_id is present so we'll coerce. + parsed_rust_message.pop("event_hash", None) + parsed_python_message.pop("event_hash", None) # type: ignore + + # The python message is a subset of the rust message which contains the complete + # row definition. This is due to a defect in the python processor. We take the + # rust message and overly the python message. This fill in the gaps of the python + # message. + assert parsed_rust_message | parsed_python_message == parsed_rust_message diff --git a/tests/consumers/test_schemas.py b/tests/consumers/test_schemas.py index 2fa7570504..42098c87ae 100644 --- a/tests/consumers/test_schemas.py +++ b/tests/consumers/test_schemas.py @@ -42,8 +42,6 @@ def _generate_tests() -> Iterator[Case]: try: for example in sentry_kafka_schemas.iter_examples(topic.value): - if "replay" in str(example.path): - continue yield Case( example=example, processor=processor, diff --git a/tests/datasets/test_replays_processor.py b/tests/datasets/test_replays_processor.py index 318cd88943..d61c3bf48c 100644 --- a/tests/datasets/test_replays_processor.py +++ b/tests/datasets/test_replays_processor.py @@ -762,15 +762,15 @@ def test_replay_actions(self) -> None: assert row["project_id"] == 1 assert row["timestamp"] == now assert row["replay_id"] == str(uuid.UUID("bb570198b8f04f8bbe87077668530da7")) - assert row["event_hash"] == "df3c3aa2daae465e89f1169e49139827" + assert row["event_hash"] == str(uuid.UUID("df3c3aa2daae465e89f1169e49139827")) assert row["segment_id"] is None assert row["trace_ids"] == [] assert row["error_ids"] == [] assert row["urls"] == [] assert row["platform"] == "javascript" - assert row["user"] is None - assert row["sdk_name"] is None - assert row["sdk_version"] is None + assert row["user"] == "" + assert row["sdk_name"] == "" + assert row["sdk_version"] == "" assert row["retention_days"] == 30 assert row["partition"] == 0 assert row["offset"] == 0 @@ -842,9 +842,8 @@ def test_replay_event_links( assert "timestamp" in row assert row[severity + "_id"] == str(uuid.UUID(event_id)) assert row["replay_id"] == str(uuid.UUID(message["replay_id"])) - assert ( - row["event_hash"] - == md5((message["replay_id"] + event_id).encode("utf-8")).hexdigest() + assert row["event_hash"] == str( + uuid.UUID(md5((message["replay_id"] + event_id).encode("utf-8")).hexdigest()) ) assert row["segment_id"] is None assert row["retention_days"] == 30 diff --git a/tests/test_replays_api.py b/tests/test_replays_api.py index f275013a36..c14049064e 100644 --- a/tests/test_replays_api.py +++ b/tests/test_replays_api.py @@ -107,7 +107,7 @@ def test_sdk_user_title_nullability(self) -> None: assert data["data"] == [ { "title": None, - "user": None, + "user": "", "sdk_name": "", "sdk_version": "", }