From 1d73a94c19c0d007af258c6a09a66fa9501fad10 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 18 Sep 2024 19:11:08 +0200 Subject: [PATCH] add cancellation row --- rust_snuba/src/mutations/clickhouse.rs | 42 ++++++++++++++++--- ..._clickhouse__tests__format_query_snap.snap | 20 +++++++++ 2 files changed, 56 insertions(+), 6 deletions(-) create mode 100644 rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap diff --git a/rust_snuba/src/mutations/clickhouse.rs b/rust_snuba/src/mutations/clickhouse.rs index 8681583fc0..0d98f5ac49 100644 --- a/rust_snuba/src/mutations/clickhouse.rs +++ b/rust_snuba/src/mutations/clickhouse.rs @@ -97,14 +97,14 @@ fn format_query(table: &str, batch: &MutationBatch) -> Vec { attr_columns.push_str(&format!(",attr_str_{i} Map(String, String)")); attr_combined_columns.push_str(&format!( - ",mapUpdate(old_data.attr_str_{i}, new_data.attr_str_{i}) AS attr_str_{i}" + ",if(signs.new_sign = 1, mapUpdate(old_data.attr_str_{i}, new_data.attr_str_{i}), old_data.attr_str_{i}) AS attr_str_{i}" )); } for i in 0..ATTRS_SHARD_FACTOR { attr_columns.push_str(&format!(",attr_num_{i} Map(String, Float64)")); attr_combined_columns.push_str(&format!( - ",mapUpdate(old_data.attr_num_{i}, new_data.attr_num_{i}) AS attr_num_{i}" + ",if(signs.new_sign = 1, mapUpdate(old_data.attr_num_{i}, new_data.attr_num_{i}), old_data.attr_num_{i}) AS attr_num_{i}" )); } @@ -127,16 +127,18 @@ fn format_query(table: &str, batch: &MutationBatch) -> Vec { let mut body = format!( " INSERT INTO {table} - SELECT old_data.* EXCEPT ('attr_.*') {attr_combined_columns} - FROM {table} old_data - JOIN (SELECT * FROM input( + SELECT old_data.* EXCEPT ('sign|attr_.*'), signs.new_sign as sign {attr_combined_columns} + FROM input( 'organization_id UInt64, trace_id UUID, span_id UInt64, _sort_timestamp DateTime {attr_columns}' - )) new_data + ) new_data + JOIN {table} old_data ON old_data.organization_id = new_data.organization_id and old_data.trace_id = new_data.trace_id and old_data.span_id = new_data.span_id and old_data._sort_timestamp = new_data._sort_timestamp and old_data.sign = 1 + JOIN (SELECT arrayJoin([1, -1]) AS new_sign) as signs + ON 1 FORMAT JSONEachRow\n" ) @@ -172,3 +174,31 @@ struct MutationRow { #[serde(flatten)] filter: PrimaryKey, } + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use crate::mutations::parser::Update; + + use super::*; + + #[test] + fn format_query_snap() { + let mut batch = MutationBatch::default(); + + batch.0.insert( + PrimaryKey { + organization_id: 69, + _sort_timestamp: 1715868485, + trace_id: Uuid::parse_str("deadbeef-dead-beef-dead-beefdeadbeef").unwrap(), + span_id: 16045690984833335023, + }, + Update::default(), + ); + let query = format_query("eap_spans_local", &batch); + let query = String::from_utf8(query).unwrap(); + + insta::assert_snapshot!(query); + } +} diff --git a/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap b/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap new file mode 100644 index 0000000000..e12239ce62 --- /dev/null +++ b/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap @@ -0,0 +1,20 @@ +--- +source: src/mutations/clickhouse.rs +expression: query +--- + INSERT INTO eap_spans_local + SELECT old_data.* EXCEPT ('sign|attr_.*'), signs.new_sign as sign ,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_0, new_data.attr_str_0), old_data.attr_str_0) AS attr_str_0,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_1, new_data.attr_str_1), old_data.attr_str_1) AS attr_str_1,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_2, new_data.attr_str_2), old_data.attr_str_2) AS attr_str_2,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_3, new_data.attr_str_3), old_data.attr_str_3) AS attr_str_3,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_4, new_data.attr_str_4), old_data.attr_str_4) AS attr_str_4,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_5, new_data.attr_str_5), old_data.attr_str_5) AS attr_str_5,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_6, new_data.attr_str_6), old_data.attr_str_6) AS attr_str_6,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_7, new_data.attr_str_7), old_data.attr_str_7) AS attr_str_7,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_8, new_data.attr_str_8), old_data.attr_str_8) AS attr_str_8,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_9, new_data.attr_str_9), old_data.attr_str_9) AS attr_str_9,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_10, new_data.attr_str_10), old_data.attr_str_10) AS attr_str_10,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_11, new_data.attr_str_11), old_data.attr_str_11) AS attr_str_11,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_12, new_data.attr_str_12), old_data.attr_str_12) AS attr_str_12,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_13, new_data.attr_str_13), old_data.attr_str_13) AS attr_str_13,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_14, new_data.attr_str_14), old_data.attr_str_14) AS attr_str_14,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_15, new_data.attr_str_15), old_data.attr_str_15) AS attr_str_15,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_16, new_data.attr_str_16), old_data.attr_str_16) AS attr_str_16,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_17, new_data.attr_str_17), old_data.attr_str_17) AS attr_str_17,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_18, new_data.attr_str_18), old_data.attr_str_18) AS attr_str_18,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_19, new_data.attr_str_19), old_data.attr_str_19) AS attr_str_19,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_0, new_data.attr_num_0), old_data.attr_num_0) AS attr_num_0,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_1, new_data.attr_num_1), old_data.attr_num_1) AS attr_num_1,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_2, new_data.attr_num_2), old_data.attr_num_2) AS attr_num_2,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_3, new_data.attr_num_3), old_data.attr_num_3) AS attr_num_3,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_4, new_data.attr_num_4), old_data.attr_num_4) AS attr_num_4,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_5, new_data.attr_num_5), old_data.attr_num_5) AS attr_num_5,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_6, new_data.attr_num_6), old_data.attr_num_6) AS attr_num_6,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_7, new_data.attr_num_7), old_data.attr_num_7) AS attr_num_7,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_8, new_data.attr_num_8), old_data.attr_num_8) AS attr_num_8,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_9, new_data.attr_num_9), old_data.attr_num_9) AS attr_num_9,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_10, new_data.attr_num_10), old_data.attr_num_10) AS attr_num_10,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_11, new_data.attr_num_11), old_data.attr_num_11) AS attr_num_11,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_12, new_data.attr_num_12), old_data.attr_num_12) AS attr_num_12,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_13, new_data.attr_num_13), old_data.attr_num_13) AS attr_num_13,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_14, new_data.attr_num_14), old_data.attr_num_14) AS attr_num_14,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_15, new_data.attr_num_15), old_data.attr_num_15) AS attr_num_15,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_16, new_data.attr_num_16), old_data.attr_num_16) AS attr_num_16,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_17, new_data.attr_num_17), old_data.attr_num_17) AS attr_num_17,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_18, new_data.attr_num_18), old_data.attr_num_18) AS attr_num_18,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_19, new_data.attr_num_19), old_data.attr_num_19) AS attr_num_19 + FROM input( + 'organization_id UInt64, trace_id UUID, span_id UInt64, _sort_timestamp DateTime ,attr_str_0 Map(String, String),attr_str_1 Map(String, String),attr_str_2 Map(String, String),attr_str_3 Map(String, String),attr_str_4 Map(String, String),attr_str_5 Map(String, String),attr_str_6 Map(String, String),attr_str_7 Map(String, String),attr_str_8 Map(String, String),attr_str_9 Map(String, String),attr_str_10 Map(String, String),attr_str_11 Map(String, String),attr_str_12 Map(String, String),attr_str_13 Map(String, String),attr_str_14 Map(String, String),attr_str_15 Map(String, String),attr_str_16 Map(String, String),attr_str_17 Map(String, String),attr_str_18 Map(String, String),attr_str_19 Map(String, String),attr_num_0 Map(String, Float64),attr_num_1 Map(String, Float64),attr_num_2 Map(String, Float64),attr_num_3 Map(String, Float64),attr_num_4 Map(String, Float64),attr_num_5 Map(String, Float64),attr_num_6 Map(String, Float64),attr_num_7 Map(String, Float64),attr_num_8 Map(String, Float64),attr_num_9 Map(String, Float64),attr_num_10 Map(String, Float64),attr_num_11 Map(String, Float64),attr_num_12 Map(String, Float64),attr_num_13 Map(String, Float64),attr_num_14 Map(String, Float64),attr_num_15 Map(String, Float64),attr_num_16 Map(String, Float64),attr_num_17 Map(String, Float64),attr_num_18 Map(String, Float64),attr_num_19 Map(String, Float64)' + ) new_data + JOIN eap_spans_local old_data + ON old_data.organization_id = new_data.organization_id + and old_data.trace_id = new_data.trace_id + and old_data.span_id = new_data.span_id + and old_data._sort_timestamp = new_data._sort_timestamp + and old_data.sign = 1 + JOIN (SELECT arrayJoin([1, -1]) AS new_sign) as signs + ON 1 + + FORMAT JSONEachRow +{"organization_id":69,"_sort_timestamp":1715868485,"trace_id":"deadbeef-dead-beef-dead-beefdeadbeef","span_id":16045690984833335023}