diff --git a/rust_snuba/src/mutations/clickhouse.rs b/rust_snuba/src/mutations/clickhouse.rs index 3561cb9730..bd34440f75 100644 --- a/rust_snuba/src/mutations/clickhouse.rs +++ b/rust_snuba/src/mutations/clickhouse.rs @@ -1,10 +1,9 @@ -use std::time::Duration; - use rust_arroyo::processing::strategies::run_task_in_threads::{ RunTaskError, RunTaskFunc, TaskRunner, }; use rust_arroyo::types::Message; use serde::Serialize; +use std::time::Duration; use reqwest::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION}; use reqwest::{Client, ClientBuilder}; @@ -12,7 +11,6 @@ use uuid::Uuid; use crate::mutations::parser::MutationBatch; use crate::processors::eap_spans::{AttributeMap, PrimaryKey, ATTRS_SHARD_FACTOR}; - #[derive(Clone)] pub struct ClickhouseWriter { url: String, @@ -141,7 +139,7 @@ fn format_query(table: &str, batch: &MutationBatch) -> Vec> { let main_insert = format!( " INSERT INTO {table} - SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([1, -1]) as sign {attr_combined_columns} + SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([-1, 1]) as sign {attr_combined_columns} FROM {table} old_data GLOBAL JOIN new_data ON old_data.organization_id = new_data.organization_id @@ -181,25 +179,140 @@ struct MutationRow { #[cfg(test)] mod tests { + use std::env; use uuid::Uuid; use crate::mutations::parser::Update; use super::*; + struct ClickhouseTestClient { + url: String, + table: String, + client: Client, + } + + impl ClickhouseTestClient { + pub async fn new(table: String) -> anyhow::Result { + let hostname = env::var("CLICKHOUSE_HOST").unwrap_or_else(|_| "127.0.0.1".to_owned()); + let http_port = 8123; + let url = format!("http://{hostname}:{http_port}"); + let uuid = Uuid::new_v4().to_string(); + let table_id = uuid.split('-').next().unwrap(); + + let client = reqwest::Client::new(); + let test_table = format!("{table}_{table_id}_local\n"); + + let body = + format!("CREATE TABLE IF NOT EXISTS {test_table} AS {table}_local\n").into_bytes(); + + // use client to create a new table locally + client.post(url.clone()).body(body).send().await?; + + Ok(Self { + url, + table: test_table, + client, + }) + } + + pub async fn run_mutation(&self, queries: Vec>) -> anyhow::Result<()> { + let session_id = Uuid::new_v4().to_string(); + + for query in queries { + self.client + .post(&self.url) + .query(&[("session_id", &session_id)]) + .body(query) + .send() + .await?; + } + Ok(()) + } + + pub async fn select_final(&self) -> anyhow::Result { + let table = &self.table; + let final_query = format!("SELECT * FROM {table} FINAL\n").into_bytes(); + + let response = self.client.post(&self.url).body(final_query).send().await?; + + let update = response.text().await?; + + Ok(update) + } + + pub async fn drop_table(&self) -> anyhow::Result<()> { + let table = &self.table; + let drop = format!("DROP TABLE {table}\n").into_bytes(); + + self.client.post(&self.url).body(drop).send().await?; + + Ok(()) + } + } + + #[tokio::test] + async fn test_simple_mutation() { + let mut batch = MutationBatch::default(); + let mut update = Update::default(); + + let organization_id = 69; + let _sort_timestamp = 1727466947; + let trace_id = Uuid::parse_str("deadbeef-dead-beef-dead-beefdeadbeef").unwrap(); + let span_id = 16045690984833335023; + + let primary_key = PrimaryKey { + organization_id, + _sort_timestamp, + trace_id, + span_id, + }; + update.attr_str.insert("a".to_string(), "b".to_string()); + + // build the mutation batch + batch.0.insert(primary_key.clone(), update); + + let test_client = ClickhouseTestClient::new("eap_spans".to_string()) + .await + .unwrap(); + + let test_table = &test_client.table; + + let insert = + format!("INSERT INTO {test_table} (organization_id, _sort_timestamp, trace_id, span_id, sign, retention_days) VALUES ({organization_id}, {_sort_timestamp}, \'{trace_id}\', {span_id}, 1, 90)\n").into_bytes(); + + let _ = test_client + .client + .post(&test_client.url) + .body(insert) + .send() + .await; + + let all_queries = format_query(test_table, &batch); + let _ = test_client.run_mutation(all_queries).await; + + // merge data at query time for up-to-date results + let mutation = test_client.select_final().await; + assert!(mutation.unwrap().contains("{'a':'b'}")); + + // clean up the temporary table at the end of test + let _ = test_client.drop_table().await; + } + #[test] fn format_query_snap() { let mut batch = MutationBatch::default(); + let mut update = Update::default(); + let primary_key = PrimaryKey { + organization_id: 69, + _sort_timestamp: 1727466947, + trace_id: Uuid::parse_str("deadbeef-dead-beef-dead-beefdeadbeef").unwrap(), + span_id: 16045690984833335023, + }; + update.attr_str.insert("a".to_string(), "b".to_string()); + + batch.0.insert(primary_key.clone(), update); - batch.0.insert( - PrimaryKey { - organization_id: 69, - _sort_timestamp: 1715868485, - trace_id: Uuid::parse_str("deadbeef-dead-beef-dead-beefdeadbeef").unwrap(), - span_id: 16045690984833335023, - }, - Update::default(), - ); let mut snapshot = String::new(); for query in format_query("eap_spans_local", &batch) { snapshot.push_str(std::str::from_utf8(&query).unwrap()); diff --git a/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap b/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap index 5fdfa367a8..9689ae65e8 100644 --- a/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap +++ b/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap @@ -4,11 +4,11 @@ expression: snapshot --- CREATE TEMPORARY TABLE new_data (organization_id UInt64, _sort_timestamp DateTime, trace_id UUID, span_id UInt64 ,attr_str_0 Map(String, String),attr_str_1 Map(String, String),attr_str_2 Map(String, String),attr_str_3 Map(String, String),attr_str_4 Map(String, String),attr_str_5 Map(String, String),attr_str_6 Map(String, String),attr_str_7 Map(String, String),attr_str_8 Map(String, String),attr_str_9 Map(String, String),attr_str_10 Map(String, String),attr_str_11 Map(String, String),attr_str_12 Map(String, String),attr_str_13 Map(String, String),attr_str_14 Map(String, String),attr_str_15 Map(String, String),attr_str_16 Map(String, String),attr_str_17 Map(String, String),attr_str_18 Map(String, String),attr_str_19 Map(String, String),attr_num_0 Map(String, Float64),attr_num_1 Map(String, Float64),attr_num_2 Map(String, Float64),attr_num_3 Map(String, Float64),attr_num_4 Map(String, Float64),attr_num_5 Map(String, Float64),attr_num_6 Map(String, Float64),attr_num_7 Map(String, Float64),attr_num_8 Map(String, Float64),attr_num_9 Map(String, Float64),attr_num_10 Map(String, Float64),attr_num_11 Map(String, Float64),attr_num_12 Map(String, Float64),attr_num_13 Map(String, Float64),attr_num_14 Map(String, Float64),attr_num_15 Map(String, Float64),attr_num_16 Map(String, Float64),attr_num_17 Map(String, Float64),attr_num_18 Map(String, Float64),attr_num_19 Map(String, Float64)); INSERT INTO new_data FORMAT JSONEachRow -{"organization_id":69,"_sort_timestamp":1715868485,"trace_id":"deadbeef-dead-beef-dead-beefdeadbeef","span_id":16045690984833335023} +{"attr_str_0":{"a":"b"},"organization_id":69,"_sort_timestamp":1727466947,"trace_id":"deadbeef-dead-beef-dead-beefdeadbeef","span_id":16045690984833335023} ; INSERT INTO eap_spans_local - SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([1, -1]) as sign ,if(sign = 1, mapUpdate(old_data.attr_str_0, new_data.attr_str_0), old_data.attr_str_0) AS attr_str_0,if(sign = 1, mapUpdate(old_data.attr_str_1, new_data.attr_str_1), old_data.attr_str_1) AS attr_str_1,if(sign = 1, mapUpdate(old_data.attr_str_2, new_data.attr_str_2), old_data.attr_str_2) AS attr_str_2,if(sign = 1, mapUpdate(old_data.attr_str_3, new_data.attr_str_3), old_data.attr_str_3) AS attr_str_3,if(sign = 1, mapUpdate(old_data.attr_str_4, new_data.attr_str_4), old_data.attr_str_4) AS attr_str_4,if(sign = 1, mapUpdate(old_data.attr_str_5, new_data.attr_str_5), old_data.attr_str_5) AS attr_str_5,if(sign = 1, mapUpdate(old_data.attr_str_6, new_data.attr_str_6), old_data.attr_str_6) AS attr_str_6,if(sign = 1, mapUpdate(old_data.attr_str_7, new_data.attr_str_7), old_data.attr_str_7) AS attr_str_7,if(sign = 1, mapUpdate(old_data.attr_str_8, new_data.attr_str_8), old_data.attr_str_8) AS attr_str_8,if(sign = 1, mapUpdate(old_data.attr_str_9, new_data.attr_str_9), old_data.attr_str_9) AS attr_str_9,if(sign = 1, mapUpdate(old_data.attr_str_10, new_data.attr_str_10), old_data.attr_str_10) AS attr_str_10,if(sign = 1, mapUpdate(old_data.attr_str_11, new_data.attr_str_11), old_data.attr_str_11) AS attr_str_11,if(sign = 1, mapUpdate(old_data.attr_str_12, new_data.attr_str_12), old_data.attr_str_12) AS attr_str_12,if(sign = 1, mapUpdate(old_data.attr_str_13, new_data.attr_str_13), old_data.attr_str_13) AS attr_str_13,if(sign = 1, mapUpdate(old_data.attr_str_14, new_data.attr_str_14), old_data.attr_str_14) AS attr_str_14,if(sign = 1, mapUpdate(old_data.attr_str_15, new_data.attr_str_15), old_data.attr_str_15) AS attr_str_15,if(sign = 1, mapUpdate(old_data.attr_str_16, new_data.attr_str_16), old_data.attr_str_16) AS attr_str_16,if(sign = 1, mapUpdate(old_data.attr_str_17, new_data.attr_str_17), old_data.attr_str_17) AS attr_str_17,if(sign = 1, mapUpdate(old_data.attr_str_18, new_data.attr_str_18), old_data.attr_str_18) AS attr_str_18,if(sign = 1, mapUpdate(old_data.attr_str_19, new_data.attr_str_19), old_data.attr_str_19) AS attr_str_19,if(sign = 1, mapUpdate(old_data.attr_num_0, new_data.attr_num_0), old_data.attr_num_0) AS attr_num_0,if(sign = 1, mapUpdate(old_data.attr_num_1, new_data.attr_num_1), old_data.attr_num_1) AS attr_num_1,if(sign = 1, mapUpdate(old_data.attr_num_2, new_data.attr_num_2), old_data.attr_num_2) AS attr_num_2,if(sign = 1, mapUpdate(old_data.attr_num_3, new_data.attr_num_3), old_data.attr_num_3) AS attr_num_3,if(sign = 1, mapUpdate(old_data.attr_num_4, new_data.attr_num_4), old_data.attr_num_4) AS attr_num_4,if(sign = 1, mapUpdate(old_data.attr_num_5, new_data.attr_num_5), old_data.attr_num_5) AS attr_num_5,if(sign = 1, mapUpdate(old_data.attr_num_6, new_data.attr_num_6), old_data.attr_num_6) AS attr_num_6,if(sign = 1, mapUpdate(old_data.attr_num_7, new_data.attr_num_7), old_data.attr_num_7) AS attr_num_7,if(sign = 1, mapUpdate(old_data.attr_num_8, new_data.attr_num_8), old_data.attr_num_8) AS attr_num_8,if(sign = 1, mapUpdate(old_data.attr_num_9, new_data.attr_num_9), old_data.attr_num_9) AS attr_num_9,if(sign = 1, mapUpdate(old_data.attr_num_10, new_data.attr_num_10), old_data.attr_num_10) AS attr_num_10,if(sign = 1, mapUpdate(old_data.attr_num_11, new_data.attr_num_11), old_data.attr_num_11) AS attr_num_11,if(sign = 1, mapUpdate(old_data.attr_num_12, new_data.attr_num_12), old_data.attr_num_12) AS attr_num_12,if(sign = 1, mapUpdate(old_data.attr_num_13, new_data.attr_num_13), old_data.attr_num_13) AS attr_num_13,if(sign = 1, mapUpdate(old_data.attr_num_14, new_data.attr_num_14), old_data.attr_num_14) AS attr_num_14,if(sign = 1, mapUpdate(old_data.attr_num_15, new_data.attr_num_15), old_data.attr_num_15) AS attr_num_15,if(sign = 1, mapUpdate(old_data.attr_num_16, new_data.attr_num_16), old_data.attr_num_16) AS attr_num_16,if(sign = 1, mapUpdate(old_data.attr_num_17, new_data.attr_num_17), old_data.attr_num_17) AS attr_num_17,if(sign = 1, mapUpdate(old_data.attr_num_18, new_data.attr_num_18), old_data.attr_num_18) AS attr_num_18,if(sign = 1, mapUpdate(old_data.attr_num_19, new_data.attr_num_19), old_data.attr_num_19) AS attr_num_19 + SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([-1, 1]) as sign ,if(sign = 1, mapUpdate(old_data.attr_str_0, new_data.attr_str_0), old_data.attr_str_0) AS attr_str_0,if(sign = 1, mapUpdate(old_data.attr_str_1, new_data.attr_str_1), old_data.attr_str_1) AS attr_str_1,if(sign = 1, mapUpdate(old_data.attr_str_2, new_data.attr_str_2), old_data.attr_str_2) AS attr_str_2,if(sign = 1, mapUpdate(old_data.attr_str_3, new_data.attr_str_3), old_data.attr_str_3) AS attr_str_3,if(sign = 1, mapUpdate(old_data.attr_str_4, new_data.attr_str_4), old_data.attr_str_4) AS attr_str_4,if(sign = 1, mapUpdate(old_data.attr_str_5, new_data.attr_str_5), old_data.attr_str_5) AS attr_str_5,if(sign = 1, mapUpdate(old_data.attr_str_6, new_data.attr_str_6), old_data.attr_str_6) AS attr_str_6,if(sign = 1, mapUpdate(old_data.attr_str_7, new_data.attr_str_7), old_data.attr_str_7) AS attr_str_7,if(sign = 1, mapUpdate(old_data.attr_str_8, new_data.attr_str_8), old_data.attr_str_8) AS attr_str_8,if(sign = 1, mapUpdate(old_data.attr_str_9, new_data.attr_str_9), old_data.attr_str_9) AS attr_str_9,if(sign = 1, mapUpdate(old_data.attr_str_10, new_data.attr_str_10), old_data.attr_str_10) AS attr_str_10,if(sign = 1, mapUpdate(old_data.attr_str_11, new_data.attr_str_11), old_data.attr_str_11) AS attr_str_11,if(sign = 1, mapUpdate(old_data.attr_str_12, new_data.attr_str_12), old_data.attr_str_12) AS attr_str_12,if(sign = 1, mapUpdate(old_data.attr_str_13, new_data.attr_str_13), old_data.attr_str_13) AS attr_str_13,if(sign = 1, mapUpdate(old_data.attr_str_14, new_data.attr_str_14), old_data.attr_str_14) AS attr_str_14,if(sign = 1, mapUpdate(old_data.attr_str_15, new_data.attr_str_15), old_data.attr_str_15) AS attr_str_15,if(sign = 1, mapUpdate(old_data.attr_str_16, new_data.attr_str_16), old_data.attr_str_16) AS attr_str_16,if(sign = 1, mapUpdate(old_data.attr_str_17, new_data.attr_str_17), old_data.attr_str_17) AS attr_str_17,if(sign = 1, mapUpdate(old_data.attr_str_18, new_data.attr_str_18), old_data.attr_str_18) AS attr_str_18,if(sign = 1, mapUpdate(old_data.attr_str_19, new_data.attr_str_19), old_data.attr_str_19) AS attr_str_19,if(sign = 1, mapUpdate(old_data.attr_num_0, new_data.attr_num_0), old_data.attr_num_0) AS attr_num_0,if(sign = 1, mapUpdate(old_data.attr_num_1, new_data.attr_num_1), old_data.attr_num_1) AS attr_num_1,if(sign = 1, mapUpdate(old_data.attr_num_2, new_data.attr_num_2), old_data.attr_num_2) AS attr_num_2,if(sign = 1, mapUpdate(old_data.attr_num_3, new_data.attr_num_3), old_data.attr_num_3) AS attr_num_3,if(sign = 1, mapUpdate(old_data.attr_num_4, new_data.attr_num_4), old_data.attr_num_4) AS attr_num_4,if(sign = 1, mapUpdate(old_data.attr_num_5, new_data.attr_num_5), old_data.attr_num_5) AS attr_num_5,if(sign = 1, mapUpdate(old_data.attr_num_6, new_data.attr_num_6), old_data.attr_num_6) AS attr_num_6,if(sign = 1, mapUpdate(old_data.attr_num_7, new_data.attr_num_7), old_data.attr_num_7) AS attr_num_7,if(sign = 1, mapUpdate(old_data.attr_num_8, new_data.attr_num_8), old_data.attr_num_8) AS attr_num_8,if(sign = 1, mapUpdate(old_data.attr_num_9, new_data.attr_num_9), old_data.attr_num_9) AS attr_num_9,if(sign = 1, mapUpdate(old_data.attr_num_10, new_data.attr_num_10), old_data.attr_num_10) AS attr_num_10,if(sign = 1, mapUpdate(old_data.attr_num_11, new_data.attr_num_11), old_data.attr_num_11) AS attr_num_11,if(sign = 1, mapUpdate(old_data.attr_num_12, new_data.attr_num_12), old_data.attr_num_12) AS attr_num_12,if(sign = 1, mapUpdate(old_data.attr_num_13, new_data.attr_num_13), old_data.attr_num_13) AS attr_num_13,if(sign = 1, mapUpdate(old_data.attr_num_14, new_data.attr_num_14), old_data.attr_num_14) AS attr_num_14,if(sign = 1, mapUpdate(old_data.attr_num_15, new_data.attr_num_15), old_data.attr_num_15) AS attr_num_15,if(sign = 1, mapUpdate(old_data.attr_num_16, new_data.attr_num_16), old_data.attr_num_16) AS attr_num_16,if(sign = 1, mapUpdate(old_data.attr_num_17, new_data.attr_num_17), old_data.attr_num_17) AS attr_num_17,if(sign = 1, mapUpdate(old_data.attr_num_18, new_data.attr_num_18), old_data.attr_num_18) AS attr_num_18,if(sign = 1, mapUpdate(old_data.attr_num_19, new_data.attr_num_19), old_data.attr_num_19) AS attr_num_19 FROM eap_spans_local old_data GLOBAL JOIN new_data ON old_data.organization_id = new_data.organization_id