Skip to content

Commit

Permalink
add cancellation row
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker committed Sep 18, 2024
1 parent ef36388 commit 1d73a94
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
42 changes: 36 additions & 6 deletions rust_snuba/src/mutations/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ fn format_query(table: &str, batch: &MutationBatch) -> Vec<u8> {
attr_columns.push_str(&format!(",attr_str_{i} Map(String, String)"));

attr_combined_columns.push_str(&format!(
",mapUpdate(old_data.attr_str_{i}, new_data.attr_str_{i}) AS attr_str_{i}"
",if(signs.new_sign = 1, mapUpdate(old_data.attr_str_{i}, new_data.attr_str_{i}), old_data.attr_str_{i}) AS attr_str_{i}"
));
}

for i in 0..ATTRS_SHARD_FACTOR {
attr_columns.push_str(&format!(",attr_num_{i} Map(String, Float64)"));
attr_combined_columns.push_str(&format!(
",mapUpdate(old_data.attr_num_{i}, new_data.attr_num_{i}) AS attr_num_{i}"
",if(signs.new_sign = 1, mapUpdate(old_data.attr_num_{i}, new_data.attr_num_{i}), old_data.attr_num_{i}) AS attr_num_{i}"
));
}

Expand All @@ -127,16 +127,18 @@ fn format_query(table: &str, batch: &MutationBatch) -> Vec<u8> {
let mut body = format!(
"
INSERT INTO {table}
SELECT old_data.* EXCEPT ('attr_.*') {attr_combined_columns}
FROM {table} old_data
JOIN (SELECT * FROM input(
SELECT old_data.* EXCEPT ('sign|attr_.*'), signs.new_sign as sign {attr_combined_columns}
FROM input(
'organization_id UInt64, trace_id UUID, span_id UInt64, _sort_timestamp DateTime {attr_columns}'
)) new_data
) new_data
JOIN {table} old_data
ON old_data.organization_id = new_data.organization_id
and old_data.trace_id = new_data.trace_id
and old_data.span_id = new_data.span_id
and old_data._sort_timestamp = new_data._sort_timestamp
and old_data.sign = 1
JOIN (SELECT arrayJoin([1, -1]) AS new_sign) as signs
ON 1
FORMAT JSONEachRow\n"
)
Expand Down Expand Up @@ -172,3 +174,31 @@ struct MutationRow {
#[serde(flatten)]
filter: PrimaryKey,
}

#[cfg(test)]
mod tests {
use uuid::Uuid;

use crate::mutations::parser::Update;

use super::*;

#[test]
fn format_query_snap() {
let mut batch = MutationBatch::default();

batch.0.insert(
PrimaryKey {
organization_id: 69,
_sort_timestamp: 1715868485,
trace_id: Uuid::parse_str("deadbeef-dead-beef-dead-beefdeadbeef").unwrap(),
span_id: 16045690984833335023,
},
Update::default(),
);
let query = format_query("eap_spans_local", &batch);
let query = String::from_utf8(query).unwrap();

insta::assert_snapshot!(query);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
source: src/mutations/clickhouse.rs
expression: query
---
INSERT INTO eap_spans_local
SELECT old_data.* EXCEPT ('sign|attr_.*'), signs.new_sign as sign ,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_0, new_data.attr_str_0), old_data.attr_str_0) AS attr_str_0,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_1, new_data.attr_str_1), old_data.attr_str_1) AS attr_str_1,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_2, new_data.attr_str_2), old_data.attr_str_2) AS attr_str_2,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_3, new_data.attr_str_3), old_data.attr_str_3) AS attr_str_3,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_4, new_data.attr_str_4), old_data.attr_str_4) AS attr_str_4,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_5, new_data.attr_str_5), old_data.attr_str_5) AS attr_str_5,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_6, new_data.attr_str_6), old_data.attr_str_6) AS attr_str_6,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_7, new_data.attr_str_7), old_data.attr_str_7) AS attr_str_7,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_8, new_data.attr_str_8), old_data.attr_str_8) AS attr_str_8,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_9, new_data.attr_str_9), old_data.attr_str_9) AS attr_str_9,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_10, new_data.attr_str_10), old_data.attr_str_10) AS attr_str_10,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_11, new_data.attr_str_11), old_data.attr_str_11) AS attr_str_11,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_12, new_data.attr_str_12), old_data.attr_str_12) AS attr_str_12,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_13, new_data.attr_str_13), old_data.attr_str_13) AS attr_str_13,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_14, new_data.attr_str_14), old_data.attr_str_14) AS attr_str_14,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_15, new_data.attr_str_15), old_data.attr_str_15) AS attr_str_15,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_16, new_data.attr_str_16), old_data.attr_str_16) AS attr_str_16,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_17, new_data.attr_str_17), old_data.attr_str_17) AS attr_str_17,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_18, new_data.attr_str_18), old_data.attr_str_18) AS attr_str_18,if(signs.new_sign = 1, mapUpdate(old_data.attr_str_19, new_data.attr_str_19), old_data.attr_str_19) AS attr_str_19,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_0, new_data.attr_num_0), old_data.attr_num_0) AS attr_num_0,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_1, new_data.attr_num_1), old_data.attr_num_1) AS attr_num_1,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_2, new_data.attr_num_2), old_data.attr_num_2) AS attr_num_2,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_3, new_data.attr_num_3), old_data.attr_num_3) AS attr_num_3,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_4, new_data.attr_num_4), old_data.attr_num_4) AS attr_num_4,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_5, new_data.attr_num_5), old_data.attr_num_5) AS attr_num_5,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_6, new_data.attr_num_6), old_data.attr_num_6) AS attr_num_6,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_7, new_data.attr_num_7), old_data.attr_num_7) AS attr_num_7,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_8, new_data.attr_num_8), old_data.attr_num_8) AS attr_num_8,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_9, new_data.attr_num_9), old_data.attr_num_9) AS attr_num_9,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_10, new_data.attr_num_10), old_data.attr_num_10) AS attr_num_10,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_11, new_data.attr_num_11), old_data.attr_num_11) AS attr_num_11,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_12, new_data.attr_num_12), old_data.attr_num_12) AS attr_num_12,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_13, new_data.attr_num_13), old_data.attr_num_13) AS attr_num_13,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_14, new_data.attr_num_14), old_data.attr_num_14) AS attr_num_14,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_15, new_data.attr_num_15), old_data.attr_num_15) AS attr_num_15,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_16, new_data.attr_num_16), old_data.attr_num_16) AS attr_num_16,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_17, new_data.attr_num_17), old_data.attr_num_17) AS attr_num_17,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_18, new_data.attr_num_18), old_data.attr_num_18) AS attr_num_18,if(signs.new_sign = 1, mapUpdate(old_data.attr_num_19, new_data.attr_num_19), old_data.attr_num_19) AS attr_num_19
FROM input(
'organization_id UInt64, trace_id UUID, span_id UInt64, _sort_timestamp DateTime ,attr_str_0 Map(String, String),attr_str_1 Map(String, String),attr_str_2 Map(String, String),attr_str_3 Map(String, String),attr_str_4 Map(String, String),attr_str_5 Map(String, String),attr_str_6 Map(String, String),attr_str_7 Map(String, String),attr_str_8 Map(String, String),attr_str_9 Map(String, String),attr_str_10 Map(String, String),attr_str_11 Map(String, String),attr_str_12 Map(String, String),attr_str_13 Map(String, String),attr_str_14 Map(String, String),attr_str_15 Map(String, String),attr_str_16 Map(String, String),attr_str_17 Map(String, String),attr_str_18 Map(String, String),attr_str_19 Map(String, String),attr_num_0 Map(String, Float64),attr_num_1 Map(String, Float64),attr_num_2 Map(String, Float64),attr_num_3 Map(String, Float64),attr_num_4 Map(String, Float64),attr_num_5 Map(String, Float64),attr_num_6 Map(String, Float64),attr_num_7 Map(String, Float64),attr_num_8 Map(String, Float64),attr_num_9 Map(String, Float64),attr_num_10 Map(String, Float64),attr_num_11 Map(String, Float64),attr_num_12 Map(String, Float64),attr_num_13 Map(String, Float64),attr_num_14 Map(String, Float64),attr_num_15 Map(String, Float64),attr_num_16 Map(String, Float64),attr_num_17 Map(String, Float64),attr_num_18 Map(String, Float64),attr_num_19 Map(String, Float64)'
) new_data
JOIN eap_spans_local old_data
ON old_data.organization_id = new_data.organization_id
and old_data.trace_id = new_data.trace_id
and old_data.span_id = new_data.span_id
and old_data._sort_timestamp = new_data._sort_timestamp
and old_data.sign = 1
JOIN (SELECT arrayJoin([1, -1]) AS new_sign) as signs
ON 1

FORMAT JSONEachRow
{"organization_id":69,"_sort_timestamp":1715868485,"trace_id":"deadbeef-dead-beef-dead-beefdeadbeef","span_id":16045690984833335023}

0 comments on commit 1d73a94

Please sign in to comment.