From 6ff69faf370c50ca21f2956da281beee5b5f9555 Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Fri, 15 Nov 2024 12:58:00 -0700 Subject: [PATCH] Start to send data to Snowflake too (#20698) This PR adds support for sending telemetry events to AWS Kinesis. In our AWS account we now have three new things: * The [Kinesis data stream](https://us-east-1.console.aws.amazon.com/kinesis/home?region=us-east-1#/streams/details/zed-telemetry/monitoring) that we will actually write to. * A [Firehose for Axiom](https://us-east-1.console.aws.amazon.com/firehose/home?region=us-east-1#/details/telemetry-to-axiom/monitoring) that sends events from that stream to Axiom for ad-hoc queries over recent data. * A [Firehose for Snowflake](https://us-east-1.console.aws.amazon.com/firehose/home?region=us-east-1#/details/telemetry-to-snowflake/monitoring) that sends events from that stream to Snowflake for long-term retention. This Firehose also backs up data into an S3 bucket in case we want to change how the system works in the future. In a follow-up PR, we'll add support for ad-hoc telemetry events; and slowly move away from the current Clickhouse defined schemas; though we won't move off click house until we have what we need in Snowflake. Co-Authored-By: Nathan Release Notes: - N/A --- Cargo.lock | 59 ++++-- crates/collab/Cargo.toml | 1 + crates/collab/k8s/collab.template.yml | 25 +++ crates/collab/src/api/events.rs | 196 +++++++++++++++++- crates/collab/src/lib.rs | 43 ++++ crates/collab/src/tests/test_server.rs | 5 + .../telemetry_events/src/telemetry_events.rs | 4 +- 7 files changed, 305 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b9ce3748964e..9feb65c45814c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1099,9 +1099,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.2" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2424565416eef55906f9f8cece2072b6b6a76075e3ff81483ebe938a89a4c05f" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -1123,6 +1123,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "aws-sdk-kinesis" +version = "1.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad48026d3d53881146469b36358d633f1b8c9ad6eb3033f348600f981f2f449b" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes 1.7.2", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-s3" version = "1.47.0" @@ -1227,9 +1249,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.3" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5df1b0fa6be58efe9d4ccc257df0a53b89cd8909e86591a13ca54817c87517be" +checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -1288,9 +1310,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" +checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90" dependencies = [ "aws-smithy-types", "bytes 1.7.2", @@ -1299,9 +1321,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.10" +version = "0.60.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01dbcb6e2588fd64cfb6d7529661b06466419e4c54ed1c62d6510d2d0350a728" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -1339,9 +1361,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.1" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87" +checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1366,9 +1388,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.2" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -1383,9 +1405,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.4" +version = "1.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "273dcdfd762fae3e1650b8024624e7cd50e484e37abdab73a7a706188ad34543" +checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" dependencies = [ "base64-simd", "bytes 1.7.2", @@ -1591,7 +1613,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.12.1", "lazy_static", "lazycell", "proc-macro2", @@ -2560,6 +2582,7 @@ dependencies = [ "async-tungstenite", "audio", "aws-config", + "aws-sdk-kinesis", "aws-sdk-s3", "axum", "axum-extra", @@ -5644,7 +5667,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -6553,7 +6576,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -14228,7 +14251,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index c5282689f45a5..a69eb53740f5b 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -24,6 +24,7 @@ async-stripe.workspace = true async-tungstenite.workspace = true aws-config = { version = "1.1.5" } aws-sdk-s3 = { version = "1.15.0" } +aws-sdk-kinesis = "1.51.0" axum = { version = "0.6", features = ["json", "headers", "ws"] } axum-extra = { version = "0.4", features = ["erased-json"] } base64.workspace = true diff --git a/crates/collab/k8s/collab.template.yml b/crates/collab/k8s/collab.template.yml index a28c685f5e8cb..fb5d4ed6eccf6 100644 --- a/crates/collab/k8s/collab.template.yml +++ b/crates/collab/k8s/collab.template.yml @@ -174,6 +174,31 @@ spec: secretKeyRef: name: blob-store key: bucket + - name: KINESIS_ACCESS_KEY + valueFrom: + secretKeyRef: + name: kinesis + key: access_key + - name: KINESIS_SECRET_KEY + valueFrom: + secretKeyRef: + name: kinesis + key: secret_key + - name: KINESIS_STREAM + valueFrom: + secretKeyRef: + name: kinesis + key: stream + - name: KINESIS_REGION + valueFrom: + secretKeyRef: + name: kinesis + key: region + - name: BLOB_STORE_BUCKET + valueFrom: + secretKeyRef: + name: blob-store + key: bucket - name: CLICKHOUSE_URL valueFrom: secretKeyRef: diff --git a/crates/collab/src/api/events.rs b/crates/collab/src/api/events.rs index c1ab3d7939a07..2357d03babc39 100644 --- a/crates/collab/src/api/events.rs +++ b/crates/collab/src/api/events.rs @@ -11,9 +11,10 @@ use axum::{ routing::post, Extension, Router, TypedHeader, }; +use chrono::Duration; use rpc::ExtensionMetadata; use semantic_version::SemanticVersion; -use serde::{Serialize, Serializer}; +use serde::{Deserialize, Serialize, Serializer}; use sha2::{Digest, Sha256}; use std::sync::{Arc, OnceLock}; use telemetry_events::{ @@ -21,6 +22,7 @@ use telemetry_events::{ EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, Panic, ReplEvent, SettingEvent, }; +use util::ResultExt; use uuid::Uuid; const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports"; @@ -388,13 +390,6 @@ pub async fn post_events( country_code_header: Option>, body: Bytes, ) -> Result<()> { - let Some(clickhouse_client) = app.clickhouse_client.clone() else { - Err(Error::http( - StatusCode::NOT_IMPLEMENTED, - "not supported".into(), - ))? - }; - let Some(expected) = calculate_json_checksum(app.clone(), &body) else { return Err(Error::http( StatusCode::INTERNAL_SERVER_ERROR, @@ -416,6 +411,35 @@ pub async fn post_events( }; let country_code = country_code_header.map(|h| h.to_string()); + let first_event_at = chrono::Utc::now() + - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event); + + if let Some(kinesis_client) = app.kinesis_client.clone() { + if let Some(stream) = app.config.kinesis_stream.clone() { + let mut request = kinesis_client.put_records().stream_name(stream); + for row in for_snowflake(request_body.clone(), first_event_at) { + if let Some(data) = serde_json::to_vec(&row).log_err() { + println!("{}", String::from_utf8_lossy(&data)); + request = request.records( + aws_sdk_kinesis::types::PutRecordsRequestEntry::builder() + .partition_key(request_body.system_id.clone().unwrap_or_default()) + .data(data.into()) + .build() + .unwrap(), + ); + } + } + request.send().await.log_err(); + } + }; + + let Some(clickhouse_client) = app.clickhouse_client.clone() else { + Err(Error::http( + StatusCode::NOT_IMPLEMENTED, + "not supported".into(), + ))? + }; + let first_event_at = chrono::Utc::now() - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event); @@ -1364,3 +1388,159 @@ pub fn calculate_json_checksum(app: Arc, json: &impl AsRef<[u8]>) -> O summer.update(checksum_seed); Some(summer.finalize().into_iter().collect()) } + +fn for_snowflake( + body: EventRequestBody, + first_event_at: chrono::DateTime, +) -> impl Iterator { + body.events.into_iter().map(move |event| SnowflakeRow { + event: match &event.event { + Event::Editor(editor_event) => format!("editor_{}", editor_event.operation), + Event::InlineCompletion(inline_completion_event) => format!( + "inline_completion_{}", + if inline_completion_event.suggestion_accepted { + "accept " + } else { + "discard" + } + ), + Event::Call(call_event) => format!("call_{}", call_event.operation.replace(" ", "_")), + Event::Assistant(assistant_event) => { + format!( + "assistant_{}", + match assistant_event.phase { + telemetry_events::AssistantPhase::Response => "response", + telemetry_events::AssistantPhase::Invoked => "invoke", + telemetry_events::AssistantPhase::Accepted => "accept", + telemetry_events::AssistantPhase::Rejected => "reject", + } + ) + } + Event::Cpu(_) => "system_cpu".to_string(), + Event::Memory(_) => "system_memory".to_string(), + Event::App(app_event) => app_event.operation.replace(" ", "_"), + Event::Setting(_) => "setting_change".to_string(), + Event::Extension(_) => "extension_load".to_string(), + Event::Edit(_) => "edit".to_string(), + Event::Action(_) => "command_palette_action".to_string(), + Event::Repl(_) => "repl".to_string(), + }, + system_id: body.system_id.clone(), + timestamp: first_event_at + Duration::milliseconds(event.milliseconds_since_first_event), + data: SnowflakeData { + installation_id: body.installation_id.clone(), + session_id: body.session_id.clone(), + metrics_id: body.metrics_id.clone(), + is_staff: body.is_staff, + app_version: body.app_version.clone(), + os_name: body.os_name.clone(), + os_version: body.os_version.clone(), + architecture: body.architecture.clone(), + release_channel: body.release_channel.clone(), + signed_in: event.signed_in, + editor_event: match &event.event { + Event::Editor(editor_event) => Some(editor_event.clone()), + _ => None, + }, + inline_completion_event: match &event.event { + Event::InlineCompletion(inline_completion_event) => { + Some(inline_completion_event.clone()) + } + _ => None, + }, + call_event: match &event.event { + Event::Call(call_event) => Some(call_event.clone()), + _ => None, + }, + assistant_event: match &event.event { + Event::Assistant(assistant_event) => Some(assistant_event.clone()), + _ => None, + }, + cpu_event: match &event.event { + Event::Cpu(cpu_event) => Some(cpu_event.clone()), + _ => None, + }, + memory_event: match &event.event { + Event::Memory(memory_event) => Some(memory_event.clone()), + _ => None, + }, + app_event: match &event.event { + Event::App(app_event) => Some(app_event.clone()), + _ => None, + }, + setting_event: match &event.event { + Event::Setting(setting_event) => Some(setting_event.clone()), + _ => None, + }, + extension_event: match &event.event { + Event::Extension(extension_event) => Some(extension_event.clone()), + _ => None, + }, + edit_event: match &event.event { + Event::Edit(edit_event) => Some(edit_event.clone()), + _ => None, + }, + repl_event: match &event.event { + Event::Repl(repl_event) => Some(repl_event.clone()), + _ => None, + }, + action_event: match event.event { + Event::Action(action_event) => Some(action_event.clone()), + _ => None, + }, + }, + }) +} + +#[derive(Serialize, Deserialize)] +struct SnowflakeRow { + pub event: String, + pub system_id: Option, + pub timestamp: chrono::DateTime, + pub data: SnowflakeData, +} + +#[derive(Serialize, Deserialize)] +struct SnowflakeData { + /// Identifier unique to each Zed installation (differs for stable, preview, dev) + pub installation_id: Option, + /// Identifier unique to each logged in Zed user (randomly generated on first sign in) + /// Identifier unique to each Zed session (differs for each time you open Zed) + pub session_id: Option, + pub metrics_id: Option, + /// True for Zed staff, otherwise false + pub is_staff: Option, + /// Zed version number + pub app_version: String, + pub os_name: String, + pub os_version: Option, + pub architecture: String, + /// Zed release channel (stable, preview, dev) + pub release_channel: Option, + pub signed_in: bool, + + #[serde(flatten)] + pub editor_event: Option, + #[serde(flatten)] + pub inline_completion_event: Option, + #[serde(flatten)] + pub call_event: Option, + #[serde(flatten)] + pub assistant_event: Option, + #[serde(flatten)] + pub cpu_event: Option, + #[serde(flatten)] + pub memory_event: Option, + #[serde(flatten)] + pub app_event: Option, + #[serde(flatten)] + pub setting_event: Option, + #[serde(flatten)] + pub extension_event: Option, + #[serde(flatten)] + pub edit_event: Option, + #[serde(flatten)] + pub repl_event: Option, + #[serde(flatten)] + pub action_event: Option, +} diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index 78f514adb734d..f595cff89011d 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -170,6 +170,10 @@ pub struct Config { pub blob_store_access_key: Option, pub blob_store_secret_key: Option, pub blob_store_bucket: Option, + pub kinesis_region: Option, + pub kinesis_stream: Option, + pub kinesis_access_key: Option, + pub kinesis_secret_key: Option, pub zed_environment: Arc, pub openai_api_key: Option>, pub google_ai_api_key: Option>, @@ -238,6 +242,10 @@ impl Config { stripe_api_key: None, supermaven_admin_api_key: None, user_backfiller_github_access_token: None, + kinesis_region: None, + kinesis_access_key: None, + kinesis_secret_key: None, + kinesis_stream: None, } } } @@ -276,6 +284,7 @@ pub struct AppState { pub rate_limiter: Arc, pub executor: Executor, pub clickhouse_client: Option<::clickhouse::Client>, + pub kinesis_client: Option<::aws_sdk_kinesis::Client>, pub config: Config, } @@ -332,6 +341,11 @@ impl AppState { .clickhouse_url .as_ref() .and_then(|_| build_clickhouse_client(&config).log_err()), + kinesis_client: if config.kinesis_access_key.is_some() { + build_kinesis_client(&config).await.log_err() + } else { + None + }, config, }; Ok(Arc::new(this)) @@ -381,6 +395,35 @@ async fn build_blob_store_client(config: &Config) -> anyhow::Result anyhow::Result { + let keys = aws_sdk_s3::config::Credentials::new( + config + .kinesis_access_key + .clone() + .ok_or_else(|| anyhow!("missing kinesis_access_key"))?, + config + .kinesis_secret_key + .clone() + .ok_or_else(|| anyhow!("missing kinesis_secret_key"))?, + None, + None, + "env", + ); + + let kinesis_config = aws_config::defaults(BehaviorVersion::latest()) + .region(Region::new( + config + .kinesis_region + .clone() + .ok_or_else(|| anyhow!("missing blob_store_region"))?, + )) + .credentials_provider(keys) + .load() + .await; + + Ok(aws_sdk_kinesis::Client::new(&kinesis_config)) +} + fn build_clickhouse_client(config: &Config) -> anyhow::Result<::clickhouse::Client> { Ok(::clickhouse::Client::default() .with_url( diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index 0e8d0fd808cf7..17cd1b51c42cd 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -512,6 +512,7 @@ impl TestServer { rate_limiter: Arc::new(RateLimiter::new(test_db.db().clone())), executor, clickhouse_client: None, + kinesis_client: None, config: Config { http_port: 0, database_url: "".into(), @@ -550,6 +551,10 @@ impl TestServer { stripe_api_key: None, supermaven_admin_api_key: None, user_backfiller_github_access_token: None, + kinesis_region: None, + kinesis_stream: None, + kinesis_access_key: None, + kinesis_secret_key: None, }, }) } diff --git a/crates/telemetry_events/src/telemetry_events.rs b/crates/telemetry_events/src/telemetry_events.rs index 43757f85d88d2..0c4ee8cb9e48f 100644 --- a/crates/telemetry_events/src/telemetry_events.rs +++ b/crates/telemetry_events/src/telemetry_events.rs @@ -4,7 +4,7 @@ use semantic_version::SemanticVersion; use serde::{Deserialize, Serialize}; use std::{fmt::Display, sync::Arc, time::Duration}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct EventRequestBody { /// Identifier unique to each system Zed is installed on pub system_id: Option, @@ -32,7 +32,7 @@ impl EventRequestBody { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct EventWrapper { pub signed_in: bool, /// Duration between this event's timestamp and the timestamp of the first event in the current batch