From ea5131ce0a9d753e81969d6f55efb30882eb5c7c Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Tue, 19 Nov 2024 12:52:00 -0700 Subject: [PATCH] Country Code To Snowflake (#20875) Release Notes: - N/A --------- Co-authored-by: Nathan Sobo --- crates/collab/src/api/events.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/collab/src/api/events.rs b/crates/collab/src/api/events.rs index 80f477df3006f..57ac43ca56d6b 100644 --- a/crates/collab/src/api/events.rs +++ b/crates/collab/src/api/events.rs @@ -418,7 +418,7 @@ pub async fn post_events( if let Some(kinesis_client) = app.kinesis_client.clone() { if let Some(stream) = app.config.kinesis_stream.clone() { let mut request = kinesis_client.put_records().stream_name(stream); - for row in for_snowflake(request_body.clone(), first_event_at) { + for row in for_snowflake(request_body.clone(), first_event_at, country_code.clone()) { if let Some(data) = serde_json::to_vec(&row).log_err() { request = request.records( aws_sdk_kinesis::types::PutRecordsRequestEntry::builder() @@ -1381,6 +1381,7 @@ pub fn calculate_json_checksum(app: Arc, json: &impl AsRef<[u8]>) -> O fn for_snowflake( body: EventRequestBody, first_event_at: chrono::DateTime, + country_code: Option, ) -> impl Iterator { body.events.into_iter().flat_map(move |event| { let timestamp = @@ -1553,6 +1554,9 @@ fn for_snowflake( body.release_channel.clone().into(), ); map.insert("signed_in".to_string(), event.signed_in.into()); + if let Some(country_code) = country_code.as_ref() { + map.insert("country_code".to_string(), country_code.clone().into()); + } } let user_properties = Some(serde_json::json!({