Skip to content

Commit

Permalink
Send events to Snowflake in the format they're expected by Amplitude (#…
Browse files Browse the repository at this point in the history
…20765)

This will allow us to use the events table directly in Amplitude, which
lets us use the newer event ingestion flow that detects changes to the
table. Otherwise we'll need a transformation.

I think Amplitude's API is probably a pretty good example to follow for
the raw event schema, even if we don't end up using their product. They
also recommend a "Noun Verbed" format for naming events, so I think we
should go with this. This will help us be consistent and encourage the
author of events to think more clearly about what event they're
reporting.

cc @ConradIrwin 

Release Notes:

- N/A
  • Loading branch information
nathansobo authored Nov 16, 2024
1 parent 97e9137 commit f9990b4
Showing 1 changed file with 182 additions and 96 deletions.
278 changes: 182 additions & 96 deletions crates/collab/src/api/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use chrono::Duration;
use rpc::ExtensionMetadata;
use semantic_version::SemanticVersion;
use serde::{Deserialize, Serialize, Serializer};
use serde_json::json;
use sha2::{Digest, Sha256};
use std::sync::{Arc, OnceLock};
use telemetry_events::{
Expand Down Expand Up @@ -1392,111 +1393,196 @@ fn for_snowflake(
body: EventRequestBody,
first_event_at: chrono::DateTime<chrono::Utc>,
) -> impl Iterator<Item = SnowflakeRow> {
body.events.into_iter().map(move |event| SnowflakeRow {
event: match &event.event {
Event::Editor(editor_event) => format!("editor_{}", editor_event.operation),
Event::InlineCompletion(inline_completion_event) => format!(
"inline_completion_{}",
if inline_completion_event.suggestion_accepted {
"accept "
} else {
"discard"
}
body.events.into_iter().map(move |event| {
let timestamp =
first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
let (event_type, mut event_properties) = match &event.event {
Event::Editor(e) => (
match e.operation.as_str() {
"open" => "Editor Opened".to_string(),
"save" => "Editor Saved".to_string(),
_ => format!("Unknown Editor Event: {}", e.operation),
},
serde_json::to_value(e).unwrap(),
),
Event::Call(call_event) => format!("call_{}", call_event.operation.replace(" ", "_")),
Event::Assistant(assistant_event) => {
Event::InlineCompletion(e) => (
format!(
"assistant_{}",
match assistant_event.phase {
telemetry_events::AssistantPhase::Response => "response",
telemetry_events::AssistantPhase::Invoked => "invoke",
telemetry_events::AssistantPhase::Accepted => "accept",
telemetry_events::AssistantPhase::Rejected => "reject",
"Inline Completion {}",
if e.suggestion_accepted {
"Accepted"
} else {
"Discarded"
}
)
),
serde_json::to_value(e).unwrap(),
),
Event::Call(e) => {
let event_type = match e.operation.trim() {
"unshare project" => "Project Unshared".to_string(),
"open channel notes" => "Channel Notes Opened".to_string(),
"share project" => "Project Shared".to_string(),
"join channel" => "Channel Joined".to_string(),
"hang up" => "Call Ended".to_string(),
"accept incoming" => "Incoming Call Accepted".to_string(),
"invite" => "Participant Invited".to_string(),
"disable microphone" => "Microphone Disabled".to_string(),
"enable microphone" => "Microphone Enabled".to_string(),
"enable screen share" => "Screen Share Enabled".to_string(),
"disable screen share" => "Screen Share Disabled".to_string(),
"decline incoming" => "Incoming Call Declined".to_string(),
"enable camera" => "Camera Enabled".to_string(),
"disable camera" => "Camera Disabled".to_string(),
_ => format!("Unknown Call Event: {}", e.operation),
};

(event_type, serde_json::to_value(e).unwrap())
}
Event::Cpu(_) => "system_cpu".to_string(),
Event::Memory(_) => "system_memory".to_string(),
Event::App(app_event) => app_event.operation.replace(" ", "_"),
Event::Setting(_) => "setting_change".to_string(),
Event::Extension(_) => "extension_load".to_string(),
Event::Edit(_) => "edit".to_string(),
Event::Action(_) => "command_palette_action".to_string(),
Event::Repl(_) => "repl".to_string(),
},
system_id: body.system_id.clone(),
timestamp: first_event_at + Duration::milliseconds(event.milliseconds_since_first_event),
data: SnowflakeData {
installation_id: body.installation_id.clone(),
session_id: body.session_id.clone(),
metrics_id: body.metrics_id.clone(),
is_staff: body.is_staff,
app_version: body.app_version.clone(),
os_name: body.os_name.clone(),
os_version: body.os_version.clone(),
architecture: body.architecture.clone(),
release_channel: body.release_channel.clone(),
signed_in: event.signed_in,
editor_event: match &event.event {
Event::Editor(editor_event) => Some(editor_event.clone()),
_ => None,
},
inline_completion_event: match &event.event {
Event::InlineCompletion(inline_completion_event) => {
Some(inline_completion_event.clone())
}
_ => None,
},
call_event: match &event.event {
Event::Call(call_event) => Some(call_event.clone()),
_ => None,
},
assistant_event: match &event.event {
Event::Assistant(assistant_event) => Some(assistant_event.clone()),
_ => None,
},
cpu_event: match &event.event {
Event::Cpu(cpu_event) => Some(cpu_event.clone()),
_ => None,
},
memory_event: match &event.event {
Event::Memory(memory_event) => Some(memory_event.clone()),
_ => None,
},
app_event: match &event.event {
Event::App(app_event) => Some(app_event.clone()),
_ => None,
},
setting_event: match &event.event {
Event::Setting(setting_event) => Some(setting_event.clone()),
_ => None,
},
extension_event: match &event.event {
Event::Extension(extension_event) => Some(extension_event.clone()),
_ => None,
},
edit_event: match &event.event {
Event::Edit(edit_event) => Some(edit_event.clone()),
_ => None,
},
repl_event: match &event.event {
Event::Repl(repl_event) => Some(repl_event.clone()),
_ => None,
},
action_event: match event.event {
Event::Action(action_event) => Some(action_event.clone()),
_ => None,
},
},
Event::Assistant(e) => (
match e.phase {
telemetry_events::AssistantPhase::Response => "Assistant Responded".to_string(),
telemetry_events::AssistantPhase::Invoked => "Assistant Invoked".to_string(),
telemetry_events::AssistantPhase::Accepted => {
"Assistant Response Accepted".to_string()
}
telemetry_events::AssistantPhase::Rejected => {
"Assistant Response Rejected".to_string()
}
},
serde_json::to_value(e).unwrap(),
),
Event::Cpu(e) => (
"System CPU Sampled".to_string(),
serde_json::to_value(e).unwrap(),
),
Event::Memory(e) => (
"System Memory Sampled".to_string(),
serde_json::to_value(e).unwrap(),
),
Event::App(e) => {
let mut properties = json!({});
let event_type = match e.operation.trim() {
"extensions: install extension" => "Extension Installed".to_string(),
"open" => "App Opened".to_string(),
"project search: open" => "Project Search Opened".to_string(),
"first open" => {
properties["is_first_open"] = json!(true);
"App First Opened".to_string()
}
"extensions: uninstall extension" => "Extension Uninstalled".to_string(),
"welcome page: close" => "Welcome Page Closed".to_string(),
"open project" => {
properties["is_first_time"] = json!(false);
"Project Opened".to_string()
}
"welcome page: install cli" => "CLI Installed".to_string(),
"project diagnostics: open" => "Project Diagnostics Opened".to_string(),
"extensions page: open" => "Extensions Page Opened".to_string(),
"welcome page: change theme" => "Welcome Theme Changed".to_string(),
"welcome page: toggle metric telemetry" => {
properties["enabled"] = json!(false);
"Welcome Telemetry Toggled".to_string()
}
"welcome page: change keymap" => "Keymap Changed".to_string(),
"welcome page: toggle vim" => {
properties["enabled"] = json!(false);
"Welcome Vim Mode Toggled".to_string()
}
"welcome page: sign in to copilot" => "Welcome Copilot Signed In".to_string(),
"welcome page: toggle diagnostic telemetry" => {
"Welcome Telemetry Toggled".to_string()
}
"welcome page: open" => "Welcome Page Opened".to_string(),
"close" => "App Closed".to_string(),
"markdown preview: open" => "Markdown Preview Opened".to_string(),
"welcome page: open extensions" => "Extensions Page Opened".to_string(),
"open node project" | "open pnpm project" | "open yarn project" => {
properties["project_type"] = json!("node");
properties["is_first_time"] = json!(false);
"Project Opened".to_string()
}
"repl sessions: open" => "REPL Session Started".to_string(),
"welcome page: toggle helix" => {
properties["enabled"] = json!(false);
"Helix Mode Toggled".to_string()
}
"welcome page: edit settings" => {
properties["changed_settings"] = json!([]);
"Settings Edited".to_string()
}
"welcome page: view docs" => "Documentation Viewed".to_string(),
"open ssh project" => {
properties["is_first_time"] = json!(false);
"SSH Project Opened".to_string()
}
"create ssh server" => "SSH Server Created".to_string(),
"create ssh project" => "SSH Project Created".to_string(),
"first open for release channel" => {
properties["is_first_for_channel"] = json!(true);
"App First Opened For Release Channel".to_string()
}
_ => format!("Unknown App Event: {}", e.operation),
};
(event_type, properties)
}
Event::Setting(e) => (
"Settings Changed".to_string(),
serde_json::to_value(e).unwrap(),
),
Event::Extension(e) => (
"Extension Loaded".to_string(),
serde_json::to_value(e).unwrap(),
),
Event::Edit(e) => (
"Editor Edited".to_string(),
serde_json::to_value(e).unwrap(),
),
Event::Action(e) => (
"Action Invoked".to_string(),
serde_json::to_value(e).unwrap(),
),
Event::Repl(e) => (
"Kernel Status Changed".to_string(),
serde_json::to_value(e).unwrap(),
),
};

if let serde_json::Value::Object(ref mut map) = event_properties {
map.insert("app_version".to_string(), body.app_version.clone().into());
map.insert("os_name".to_string(), body.os_name.clone().into());
map.insert("os_version".to_string(), body.os_version.clone().into());
map.insert("architecture".to_string(), body.architecture.clone().into());
map.insert(
"release_channel".to_string(),
body.release_channel.clone().into(),
);
map.insert("signed_in".to_string(), event.signed_in.into());
}

let user_properties = Some(serde_json::json!({
"is_staff": body.is_staff,
}));

SnowflakeRow {
time: timestamp,
user_id: body.metrics_id.clone(),
device_id: body.system_id.clone(),
event_type,
event_properties,
user_properties,
insert_id: Some(Uuid::new_v4().to_string()),
}
})
}

#[derive(Serialize, Deserialize)]
struct SnowflakeRow {
pub event: String,
pub system_id: Option<String>,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub data: SnowflakeData,
pub time: chrono::DateTime<chrono::Utc>,
pub user_id: Option<String>,
pub device_id: Option<String>,
pub event_type: String,
pub event_properties: serde_json::Value,
pub user_properties: Option<serde_json::Value>,
pub insert_id: Option<String>,
}

#[derive(Serialize, Deserialize)]
Expand Down

0 comments on commit f9990b4

Please sign in to comment.