From f9990b42fa3ef81b53d0b0f42c88836e319758f1 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Sat, 16 Nov 2024 09:58:19 -0700 Subject: [PATCH] Send events to Snowflake in the format they're expected by Amplitude (#20765) This will allow us to use the events table directly in Amplitude, which lets us use the newer event ingestion flow that detects changes to the table. Otherwise we'll need a transformation. I think Amplitude's API is probably a pretty good example to follow for the raw event schema, even if we don't end up using their product. They also recommend a "Noun Verbed" format for naming events, so I think we should go with this. This will help us be consistent and encourage the author of events to think more clearly about what event they're reporting. cc @ConradIrwin Release Notes: - N/A --- crates/collab/src/api/events.rs | 278 +++++++++++++++++++++----------- 1 file changed, 182 insertions(+), 96 deletions(-) diff --git a/crates/collab/src/api/events.rs b/crates/collab/src/api/events.rs index 9d487c80386e2..3b7a3b39003c5 100644 --- a/crates/collab/src/api/events.rs +++ b/crates/collab/src/api/events.rs @@ -15,6 +15,7 @@ use chrono::Duration; use rpc::ExtensionMetadata; use semantic_version::SemanticVersion; use serde::{Deserialize, Serialize, Serializer}; +use serde_json::json; use sha2::{Digest, Sha256}; use std::sync::{Arc, OnceLock}; use telemetry_events::{ @@ -1392,111 +1393,196 @@ fn for_snowflake( body: EventRequestBody, first_event_at: chrono::DateTime, ) -> impl Iterator { - body.events.into_iter().map(move |event| SnowflakeRow { - event: match &event.event { - Event::Editor(editor_event) => format!("editor_{}", editor_event.operation), - Event::InlineCompletion(inline_completion_event) => format!( - "inline_completion_{}", - if inline_completion_event.suggestion_accepted { - "accept " - } else { - "discard" - } + body.events.into_iter().map(move |event| { + let timestamp = + first_event_at + Duration::milliseconds(event.milliseconds_since_first_event); + let (event_type, mut event_properties) = match &event.event { + Event::Editor(e) => ( + match e.operation.as_str() { + "open" => "Editor Opened".to_string(), + "save" => "Editor Saved".to_string(), + _ => format!("Unknown Editor Event: {}", e.operation), + }, + serde_json::to_value(e).unwrap(), ), - Event::Call(call_event) => format!("call_{}", call_event.operation.replace(" ", "_")), - Event::Assistant(assistant_event) => { + Event::InlineCompletion(e) => ( format!( - "assistant_{}", - match assistant_event.phase { - telemetry_events::AssistantPhase::Response => "response", - telemetry_events::AssistantPhase::Invoked => "invoke", - telemetry_events::AssistantPhase::Accepted => "accept", - telemetry_events::AssistantPhase::Rejected => "reject", + "Inline Completion {}", + if e.suggestion_accepted { + "Accepted" + } else { + "Discarded" } - ) + ), + serde_json::to_value(e).unwrap(), + ), + Event::Call(e) => { + let event_type = match e.operation.trim() { + "unshare project" => "Project Unshared".to_string(), + "open channel notes" => "Channel Notes Opened".to_string(), + "share project" => "Project Shared".to_string(), + "join channel" => "Channel Joined".to_string(), + "hang up" => "Call Ended".to_string(), + "accept incoming" => "Incoming Call Accepted".to_string(), + "invite" => "Participant Invited".to_string(), + "disable microphone" => "Microphone Disabled".to_string(), + "enable microphone" => "Microphone Enabled".to_string(), + "enable screen share" => "Screen Share Enabled".to_string(), + "disable screen share" => "Screen Share Disabled".to_string(), + "decline incoming" => "Incoming Call Declined".to_string(), + "enable camera" => "Camera Enabled".to_string(), + "disable camera" => "Camera Disabled".to_string(), + _ => format!("Unknown Call Event: {}", e.operation), + }; + + (event_type, serde_json::to_value(e).unwrap()) } - Event::Cpu(_) => "system_cpu".to_string(), - Event::Memory(_) => "system_memory".to_string(), - Event::App(app_event) => app_event.operation.replace(" ", "_"), - Event::Setting(_) => "setting_change".to_string(), - Event::Extension(_) => "extension_load".to_string(), - Event::Edit(_) => "edit".to_string(), - Event::Action(_) => "command_palette_action".to_string(), - Event::Repl(_) => "repl".to_string(), - }, - system_id: body.system_id.clone(), - timestamp: first_event_at + Duration::milliseconds(event.milliseconds_since_first_event), - data: SnowflakeData { - installation_id: body.installation_id.clone(), - session_id: body.session_id.clone(), - metrics_id: body.metrics_id.clone(), - is_staff: body.is_staff, - app_version: body.app_version.clone(), - os_name: body.os_name.clone(), - os_version: body.os_version.clone(), - architecture: body.architecture.clone(), - release_channel: body.release_channel.clone(), - signed_in: event.signed_in, - editor_event: match &event.event { - Event::Editor(editor_event) => Some(editor_event.clone()), - _ => None, - }, - inline_completion_event: match &event.event { - Event::InlineCompletion(inline_completion_event) => { - Some(inline_completion_event.clone()) - } - _ => None, - }, - call_event: match &event.event { - Event::Call(call_event) => Some(call_event.clone()), - _ => None, - }, - assistant_event: match &event.event { - Event::Assistant(assistant_event) => Some(assistant_event.clone()), - _ => None, - }, - cpu_event: match &event.event { - Event::Cpu(cpu_event) => Some(cpu_event.clone()), - _ => None, - }, - memory_event: match &event.event { - Event::Memory(memory_event) => Some(memory_event.clone()), - _ => None, - }, - app_event: match &event.event { - Event::App(app_event) => Some(app_event.clone()), - _ => None, - }, - setting_event: match &event.event { - Event::Setting(setting_event) => Some(setting_event.clone()), - _ => None, - }, - extension_event: match &event.event { - Event::Extension(extension_event) => Some(extension_event.clone()), - _ => None, - }, - edit_event: match &event.event { - Event::Edit(edit_event) => Some(edit_event.clone()), - _ => None, - }, - repl_event: match &event.event { - Event::Repl(repl_event) => Some(repl_event.clone()), - _ => None, - }, - action_event: match event.event { - Event::Action(action_event) => Some(action_event.clone()), - _ => None, - }, - }, + Event::Assistant(e) => ( + match e.phase { + telemetry_events::AssistantPhase::Response => "Assistant Responded".to_string(), + telemetry_events::AssistantPhase::Invoked => "Assistant Invoked".to_string(), + telemetry_events::AssistantPhase::Accepted => { + "Assistant Response Accepted".to_string() + } + telemetry_events::AssistantPhase::Rejected => { + "Assistant Response Rejected".to_string() + } + }, + serde_json::to_value(e).unwrap(), + ), + Event::Cpu(e) => ( + "System CPU Sampled".to_string(), + serde_json::to_value(e).unwrap(), + ), + Event::Memory(e) => ( + "System Memory Sampled".to_string(), + serde_json::to_value(e).unwrap(), + ), + Event::App(e) => { + let mut properties = json!({}); + let event_type = match e.operation.trim() { + "extensions: install extension" => "Extension Installed".to_string(), + "open" => "App Opened".to_string(), + "project search: open" => "Project Search Opened".to_string(), + "first open" => { + properties["is_first_open"] = json!(true); + "App First Opened".to_string() + } + "extensions: uninstall extension" => "Extension Uninstalled".to_string(), + "welcome page: close" => "Welcome Page Closed".to_string(), + "open project" => { + properties["is_first_time"] = json!(false); + "Project Opened".to_string() + } + "welcome page: install cli" => "CLI Installed".to_string(), + "project diagnostics: open" => "Project Diagnostics Opened".to_string(), + "extensions page: open" => "Extensions Page Opened".to_string(), + "welcome page: change theme" => "Welcome Theme Changed".to_string(), + "welcome page: toggle metric telemetry" => { + properties["enabled"] = json!(false); + "Welcome Telemetry Toggled".to_string() + } + "welcome page: change keymap" => "Keymap Changed".to_string(), + "welcome page: toggle vim" => { + properties["enabled"] = json!(false); + "Welcome Vim Mode Toggled".to_string() + } + "welcome page: sign in to copilot" => "Welcome Copilot Signed In".to_string(), + "welcome page: toggle diagnostic telemetry" => { + "Welcome Telemetry Toggled".to_string() + } + "welcome page: open" => "Welcome Page Opened".to_string(), + "close" => "App Closed".to_string(), + "markdown preview: open" => "Markdown Preview Opened".to_string(), + "welcome page: open extensions" => "Extensions Page Opened".to_string(), + "open node project" | "open pnpm project" | "open yarn project" => { + properties["project_type"] = json!("node"); + properties["is_first_time"] = json!(false); + "Project Opened".to_string() + } + "repl sessions: open" => "REPL Session Started".to_string(), + "welcome page: toggle helix" => { + properties["enabled"] = json!(false); + "Helix Mode Toggled".to_string() + } + "welcome page: edit settings" => { + properties["changed_settings"] = json!([]); + "Settings Edited".to_string() + } + "welcome page: view docs" => "Documentation Viewed".to_string(), + "open ssh project" => { + properties["is_first_time"] = json!(false); + "SSH Project Opened".to_string() + } + "create ssh server" => "SSH Server Created".to_string(), + "create ssh project" => "SSH Project Created".to_string(), + "first open for release channel" => { + properties["is_first_for_channel"] = json!(true); + "App First Opened For Release Channel".to_string() + } + _ => format!("Unknown App Event: {}", e.operation), + }; + (event_type, properties) + } + Event::Setting(e) => ( + "Settings Changed".to_string(), + serde_json::to_value(e).unwrap(), + ), + Event::Extension(e) => ( + "Extension Loaded".to_string(), + serde_json::to_value(e).unwrap(), + ), + Event::Edit(e) => ( + "Editor Edited".to_string(), + serde_json::to_value(e).unwrap(), + ), + Event::Action(e) => ( + "Action Invoked".to_string(), + serde_json::to_value(e).unwrap(), + ), + Event::Repl(e) => ( + "Kernel Status Changed".to_string(), + serde_json::to_value(e).unwrap(), + ), + }; + + if let serde_json::Value::Object(ref mut map) = event_properties { + map.insert("app_version".to_string(), body.app_version.clone().into()); + map.insert("os_name".to_string(), body.os_name.clone().into()); + map.insert("os_version".to_string(), body.os_version.clone().into()); + map.insert("architecture".to_string(), body.architecture.clone().into()); + map.insert( + "release_channel".to_string(), + body.release_channel.clone().into(), + ); + map.insert("signed_in".to_string(), event.signed_in.into()); + } + + let user_properties = Some(serde_json::json!({ + "is_staff": body.is_staff, + })); + + SnowflakeRow { + time: timestamp, + user_id: body.metrics_id.clone(), + device_id: body.system_id.clone(), + event_type, + event_properties, + user_properties, + insert_id: Some(Uuid::new_v4().to_string()), + } }) } #[derive(Serialize, Deserialize)] struct SnowflakeRow { - pub event: String, - pub system_id: Option, - pub timestamp: chrono::DateTime, - pub data: SnowflakeData, + pub time: chrono::DateTime, + pub user_id: Option, + pub device_id: Option, + pub event_type: String, + pub event_properties: serde_json::Value, + pub user_properties: Option, + pub insert_id: Option, } #[derive(Serialize, Deserialize)]