Skip to content

Commit

Permalink
test(eap): Testing framework for running mutations on Clickhouse (#6346)
Browse files Browse the repository at this point in the history
A follow-up to #6216 

More rigorous testing for the main query run on the mutations consumer.
We need to be able to catch any correctness issues with the query in CI
itself.

---------

Co-authored-by: Markus Unterwaditzer <markus-tarpit+git@unterwaditzer.net>
Co-authored-by: Markus Unterwaditzer <markus-honeypot@unterwaditzer.net>
  • Loading branch information
3 people authored Sep 30, 2024
1 parent 517e298 commit 87b8e30
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 15 deletions.
139 changes: 126 additions & 13 deletions rust_snuba/src/mutations/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
use std::time::Duration;

use rust_arroyo::processing::strategies::run_task_in_threads::{
RunTaskError, RunTaskFunc, TaskRunner,
};
use rust_arroyo::types::Message;
use serde::Serialize;
use std::time::Duration;

use reqwest::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION};
use reqwest::{Client, ClientBuilder};
use uuid::Uuid;

use crate::mutations::parser::MutationBatch;
use crate::processors::eap_spans::{AttributeMap, PrimaryKey, ATTRS_SHARD_FACTOR};

#[derive(Clone)]
pub struct ClickhouseWriter {
url: String,
Expand Down Expand Up @@ -141,7 +139,7 @@ fn format_query(table: &str, batch: &MutationBatch) -> Vec<Vec<u8>> {
let main_insert = format!(
"
INSERT INTO {table}
SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([1, -1]) as sign {attr_combined_columns}
SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([-1, 1]) as sign {attr_combined_columns}
FROM {table} old_data
GLOBAL JOIN new_data
ON old_data.organization_id = new_data.organization_id
Expand Down Expand Up @@ -181,25 +179,140 @@ struct MutationRow {

#[cfg(test)]
mod tests {
use std::env;
use uuid::Uuid;

use crate::mutations::parser::Update;

use super::*;

struct ClickhouseTestClient {
url: String,
table: String,
client: Client,
}

impl ClickhouseTestClient {
pub async fn new(table: String) -> anyhow::Result<Self> {
let hostname = env::var("CLICKHOUSE_HOST").unwrap_or_else(|_| "127.0.0.1".to_owned());
let http_port = 8123;
let url = format!("http://{hostname}:{http_port}");
let uuid = Uuid::new_v4().to_string();
let table_id = uuid.split('-').next().unwrap();

let client = reqwest::Client::new();
let test_table = format!("{table}_{table_id}_local\n");

let body =
format!("CREATE TABLE IF NOT EXISTS {test_table} AS {table}_local\n").into_bytes();

// use client to create a new table locally
client.post(url.clone()).body(body).send().await?;

Ok(Self {
url,
table: test_table,
client,
})
}

pub async fn run_mutation(&self, queries: Vec<Vec<u8>>) -> anyhow::Result<()> {
let session_id = Uuid::new_v4().to_string();

for query in queries {
self.client
.post(&self.url)
.query(&[("session_id", &session_id)])
.body(query)
.send()
.await?;
}
Ok(())
}

pub async fn select_final(&self) -> anyhow::Result<String> {
let table = &self.table;
let final_query = format!("SELECT * FROM {table} FINAL\n").into_bytes();

let response = self.client.post(&self.url).body(final_query).send().await?;

let update = response.text().await?;

Ok(update)
}

pub async fn drop_table(&self) -> anyhow::Result<()> {
let table = &self.table;
let drop = format!("DROP TABLE {table}\n").into_bytes();

self.client.post(&self.url).body(drop).send().await?;

Ok(())
}
}

#[tokio::test]
async fn test_simple_mutation() {
let mut batch = MutationBatch::default();
let mut update = Update::default();

let organization_id = 69;
let _sort_timestamp = 1727466947;
let trace_id = Uuid::parse_str("deadbeef-dead-beef-dead-beefdeadbeef").unwrap();
let span_id = 16045690984833335023;

let primary_key = PrimaryKey {
organization_id,
_sort_timestamp,
trace_id,
span_id,
};
update.attr_str.insert("a".to_string(), "b".to_string());

// build the mutation batch
batch.0.insert(primary_key.clone(), update);

let test_client = ClickhouseTestClient::new("eap_spans".to_string())
.await
.unwrap();

let test_table = &test_client.table;

let insert =
format!("INSERT INTO {test_table} (organization_id, _sort_timestamp, trace_id, span_id, sign, retention_days) VALUES ({organization_id}, {_sort_timestamp}, \'{trace_id}\', {span_id}, 1, 90)\n").into_bytes();

let _ = test_client
.client
.post(&test_client.url)
.body(insert)
.send()
.await;

let all_queries = format_query(test_table, &batch);
let _ = test_client.run_mutation(all_queries).await;

// merge data at query time for up-to-date results
let mutation = test_client.select_final().await;
assert!(mutation.unwrap().contains("{'a':'b'}"));

// clean up the temporary table at the end of test
let _ = test_client.drop_table().await;
}

#[test]
fn format_query_snap() {
let mut batch = MutationBatch::default();
let mut update = Update::default();
let primary_key = PrimaryKey {
organization_id: 69,
_sort_timestamp: 1727466947,
trace_id: Uuid::parse_str("deadbeef-dead-beef-dead-beefdeadbeef").unwrap(),
span_id: 16045690984833335023,
};
update.attr_str.insert("a".to_string(), "b".to_string());

batch.0.insert(primary_key.clone(), update);

batch.0.insert(
PrimaryKey {
organization_id: 69,
_sort_timestamp: 1715868485,
trace_id: Uuid::parse_str("deadbeef-dead-beef-dead-beefdeadbeef").unwrap(),
span_id: 16045690984833335023,
},
Update::default(),
);
let mut snapshot = String::new();
for query in format_query("eap_spans_local", &batch) {
snapshot.push_str(std::str::from_utf8(&query).unwrap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ expression: snapshot
---
CREATE TEMPORARY TABLE new_data (organization_id UInt64, _sort_timestamp DateTime, trace_id UUID, span_id UInt64 ,attr_str_0 Map(String, String),attr_str_1 Map(String, String),attr_str_2 Map(String, String),attr_str_3 Map(String, String),attr_str_4 Map(String, String),attr_str_5 Map(String, String),attr_str_6 Map(String, String),attr_str_7 Map(String, String),attr_str_8 Map(String, String),attr_str_9 Map(String, String),attr_str_10 Map(String, String),attr_str_11 Map(String, String),attr_str_12 Map(String, String),attr_str_13 Map(String, String),attr_str_14 Map(String, String),attr_str_15 Map(String, String),attr_str_16 Map(String, String),attr_str_17 Map(String, String),attr_str_18 Map(String, String),attr_str_19 Map(String, String),attr_num_0 Map(String, Float64),attr_num_1 Map(String, Float64),attr_num_2 Map(String, Float64),attr_num_3 Map(String, Float64),attr_num_4 Map(String, Float64),attr_num_5 Map(String, Float64),attr_num_6 Map(String, Float64),attr_num_7 Map(String, Float64),attr_num_8 Map(String, Float64),attr_num_9 Map(String, Float64),attr_num_10 Map(String, Float64),attr_num_11 Map(String, Float64),attr_num_12 Map(String, Float64),attr_num_13 Map(String, Float64),attr_num_14 Map(String, Float64),attr_num_15 Map(String, Float64),attr_num_16 Map(String, Float64),attr_num_17 Map(String, Float64),attr_num_18 Map(String, Float64),attr_num_19 Map(String, Float64));
INSERT INTO new_data FORMAT JSONEachRow
{"organization_id":69,"_sort_timestamp":1715868485,"trace_id":"deadbeef-dead-beef-dead-beefdeadbeef","span_id":16045690984833335023}
{"attr_str_0":{"a":"b"},"organization_id":69,"_sort_timestamp":1727466947,"trace_id":"deadbeef-dead-beef-dead-beefdeadbeef","span_id":16045690984833335023}
;

INSERT INTO eap_spans_local
SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([1, -1]) as sign ,if(sign = 1, mapUpdate(old_data.attr_str_0, new_data.attr_str_0), old_data.attr_str_0) AS attr_str_0,if(sign = 1, mapUpdate(old_data.attr_str_1, new_data.attr_str_1), old_data.attr_str_1) AS attr_str_1,if(sign = 1, mapUpdate(old_data.attr_str_2, new_data.attr_str_2), old_data.attr_str_2) AS attr_str_2,if(sign = 1, mapUpdate(old_data.attr_str_3, new_data.attr_str_3), old_data.attr_str_3) AS attr_str_3,if(sign = 1, mapUpdate(old_data.attr_str_4, new_data.attr_str_4), old_data.attr_str_4) AS attr_str_4,if(sign = 1, mapUpdate(old_data.attr_str_5, new_data.attr_str_5), old_data.attr_str_5) AS attr_str_5,if(sign = 1, mapUpdate(old_data.attr_str_6, new_data.attr_str_6), old_data.attr_str_6) AS attr_str_6,if(sign = 1, mapUpdate(old_data.attr_str_7, new_data.attr_str_7), old_data.attr_str_7) AS attr_str_7,if(sign = 1, mapUpdate(old_data.attr_str_8, new_data.attr_str_8), old_data.attr_str_8) AS attr_str_8,if(sign = 1, mapUpdate(old_data.attr_str_9, new_data.attr_str_9), old_data.attr_str_9) AS attr_str_9,if(sign = 1, mapUpdate(old_data.attr_str_10, new_data.attr_str_10), old_data.attr_str_10) AS attr_str_10,if(sign = 1, mapUpdate(old_data.attr_str_11, new_data.attr_str_11), old_data.attr_str_11) AS attr_str_11,if(sign = 1, mapUpdate(old_data.attr_str_12, new_data.attr_str_12), old_data.attr_str_12) AS attr_str_12,if(sign = 1, mapUpdate(old_data.attr_str_13, new_data.attr_str_13), old_data.attr_str_13) AS attr_str_13,if(sign = 1, mapUpdate(old_data.attr_str_14, new_data.attr_str_14), old_data.attr_str_14) AS attr_str_14,if(sign = 1, mapUpdate(old_data.attr_str_15, new_data.attr_str_15), old_data.attr_str_15) AS attr_str_15,if(sign = 1, mapUpdate(old_data.attr_str_16, new_data.attr_str_16), old_data.attr_str_16) AS attr_str_16,if(sign = 1, mapUpdate(old_data.attr_str_17, new_data.attr_str_17), old_data.attr_str_17) AS attr_str_17,if(sign = 1, mapUpdate(old_data.attr_str_18, new_data.attr_str_18), old_data.attr_str_18) AS attr_str_18,if(sign = 1, mapUpdate(old_data.attr_str_19, new_data.attr_str_19), old_data.attr_str_19) AS attr_str_19,if(sign = 1, mapUpdate(old_data.attr_num_0, new_data.attr_num_0), old_data.attr_num_0) AS attr_num_0,if(sign = 1, mapUpdate(old_data.attr_num_1, new_data.attr_num_1), old_data.attr_num_1) AS attr_num_1,if(sign = 1, mapUpdate(old_data.attr_num_2, new_data.attr_num_2), old_data.attr_num_2) AS attr_num_2,if(sign = 1, mapUpdate(old_data.attr_num_3, new_data.attr_num_3), old_data.attr_num_3) AS attr_num_3,if(sign = 1, mapUpdate(old_data.attr_num_4, new_data.attr_num_4), old_data.attr_num_4) AS attr_num_4,if(sign = 1, mapUpdate(old_data.attr_num_5, new_data.attr_num_5), old_data.attr_num_5) AS attr_num_5,if(sign = 1, mapUpdate(old_data.attr_num_6, new_data.attr_num_6), old_data.attr_num_6) AS attr_num_6,if(sign = 1, mapUpdate(old_data.attr_num_7, new_data.attr_num_7), old_data.attr_num_7) AS attr_num_7,if(sign = 1, mapUpdate(old_data.attr_num_8, new_data.attr_num_8), old_data.attr_num_8) AS attr_num_8,if(sign = 1, mapUpdate(old_data.attr_num_9, new_data.attr_num_9), old_data.attr_num_9) AS attr_num_9,if(sign = 1, mapUpdate(old_data.attr_num_10, new_data.attr_num_10), old_data.attr_num_10) AS attr_num_10,if(sign = 1, mapUpdate(old_data.attr_num_11, new_data.attr_num_11), old_data.attr_num_11) AS attr_num_11,if(sign = 1, mapUpdate(old_data.attr_num_12, new_data.attr_num_12), old_data.attr_num_12) AS attr_num_12,if(sign = 1, mapUpdate(old_data.attr_num_13, new_data.attr_num_13), old_data.attr_num_13) AS attr_num_13,if(sign = 1, mapUpdate(old_data.attr_num_14, new_data.attr_num_14), old_data.attr_num_14) AS attr_num_14,if(sign = 1, mapUpdate(old_data.attr_num_15, new_data.attr_num_15), old_data.attr_num_15) AS attr_num_15,if(sign = 1, mapUpdate(old_data.attr_num_16, new_data.attr_num_16), old_data.attr_num_16) AS attr_num_16,if(sign = 1, mapUpdate(old_data.attr_num_17, new_data.attr_num_17), old_data.attr_num_17) AS attr_num_17,if(sign = 1, mapUpdate(old_data.attr_num_18, new_data.attr_num_18), old_data.attr_num_18) AS attr_num_18,if(sign = 1, mapUpdate(old_data.attr_num_19, new_data.attr_num_19), old_data.attr_num_19) AS attr_num_19
SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([-1, 1]) as sign ,if(sign = 1, mapUpdate(old_data.attr_str_0, new_data.attr_str_0), old_data.attr_str_0) AS attr_str_0,if(sign = 1, mapUpdate(old_data.attr_str_1, new_data.attr_str_1), old_data.attr_str_1) AS attr_str_1,if(sign = 1, mapUpdate(old_data.attr_str_2, new_data.attr_str_2), old_data.attr_str_2) AS attr_str_2,if(sign = 1, mapUpdate(old_data.attr_str_3, new_data.attr_str_3), old_data.attr_str_3) AS attr_str_3,if(sign = 1, mapUpdate(old_data.attr_str_4, new_data.attr_str_4), old_data.attr_str_4) AS attr_str_4,if(sign = 1, mapUpdate(old_data.attr_str_5, new_data.attr_str_5), old_data.attr_str_5) AS attr_str_5,if(sign = 1, mapUpdate(old_data.attr_str_6, new_data.attr_str_6), old_data.attr_str_6) AS attr_str_6,if(sign = 1, mapUpdate(old_data.attr_str_7, new_data.attr_str_7), old_data.attr_str_7) AS attr_str_7,if(sign = 1, mapUpdate(old_data.attr_str_8, new_data.attr_str_8), old_data.attr_str_8) AS attr_str_8,if(sign = 1, mapUpdate(old_data.attr_str_9, new_data.attr_str_9), old_data.attr_str_9) AS attr_str_9,if(sign = 1, mapUpdate(old_data.attr_str_10, new_data.attr_str_10), old_data.attr_str_10) AS attr_str_10,if(sign = 1, mapUpdate(old_data.attr_str_11, new_data.attr_str_11), old_data.attr_str_11) AS attr_str_11,if(sign = 1, mapUpdate(old_data.attr_str_12, new_data.attr_str_12), old_data.attr_str_12) AS attr_str_12,if(sign = 1, mapUpdate(old_data.attr_str_13, new_data.attr_str_13), old_data.attr_str_13) AS attr_str_13,if(sign = 1, mapUpdate(old_data.attr_str_14, new_data.attr_str_14), old_data.attr_str_14) AS attr_str_14,if(sign = 1, mapUpdate(old_data.attr_str_15, new_data.attr_str_15), old_data.attr_str_15) AS attr_str_15,if(sign = 1, mapUpdate(old_data.attr_str_16, new_data.attr_str_16), old_data.attr_str_16) AS attr_str_16,if(sign = 1, mapUpdate(old_data.attr_str_17, new_data.attr_str_17), old_data.attr_str_17) AS attr_str_17,if(sign = 1, mapUpdate(old_data.attr_str_18, new_data.attr_str_18), old_data.attr_str_18) AS attr_str_18,if(sign = 1, mapUpdate(old_data.attr_str_19, new_data.attr_str_19), old_data.attr_str_19) AS attr_str_19,if(sign = 1, mapUpdate(old_data.attr_num_0, new_data.attr_num_0), old_data.attr_num_0) AS attr_num_0,if(sign = 1, mapUpdate(old_data.attr_num_1, new_data.attr_num_1), old_data.attr_num_1) AS attr_num_1,if(sign = 1, mapUpdate(old_data.attr_num_2, new_data.attr_num_2), old_data.attr_num_2) AS attr_num_2,if(sign = 1, mapUpdate(old_data.attr_num_3, new_data.attr_num_3), old_data.attr_num_3) AS attr_num_3,if(sign = 1, mapUpdate(old_data.attr_num_4, new_data.attr_num_4), old_data.attr_num_4) AS attr_num_4,if(sign = 1, mapUpdate(old_data.attr_num_5, new_data.attr_num_5), old_data.attr_num_5) AS attr_num_5,if(sign = 1, mapUpdate(old_data.attr_num_6, new_data.attr_num_6), old_data.attr_num_6) AS attr_num_6,if(sign = 1, mapUpdate(old_data.attr_num_7, new_data.attr_num_7), old_data.attr_num_7) AS attr_num_7,if(sign = 1, mapUpdate(old_data.attr_num_8, new_data.attr_num_8), old_data.attr_num_8) AS attr_num_8,if(sign = 1, mapUpdate(old_data.attr_num_9, new_data.attr_num_9), old_data.attr_num_9) AS attr_num_9,if(sign = 1, mapUpdate(old_data.attr_num_10, new_data.attr_num_10), old_data.attr_num_10) AS attr_num_10,if(sign = 1, mapUpdate(old_data.attr_num_11, new_data.attr_num_11), old_data.attr_num_11) AS attr_num_11,if(sign = 1, mapUpdate(old_data.attr_num_12, new_data.attr_num_12), old_data.attr_num_12) AS attr_num_12,if(sign = 1, mapUpdate(old_data.attr_num_13, new_data.attr_num_13), old_data.attr_num_13) AS attr_num_13,if(sign = 1, mapUpdate(old_data.attr_num_14, new_data.attr_num_14), old_data.attr_num_14) AS attr_num_14,if(sign = 1, mapUpdate(old_data.attr_num_15, new_data.attr_num_15), old_data.attr_num_15) AS attr_num_15,if(sign = 1, mapUpdate(old_data.attr_num_16, new_data.attr_num_16), old_data.attr_num_16) AS attr_num_16,if(sign = 1, mapUpdate(old_data.attr_num_17, new_data.attr_num_17), old_data.attr_num_17) AS attr_num_17,if(sign = 1, mapUpdate(old_data.attr_num_18, new_data.attr_num_18), old_data.attr_num_18) AS attr_num_18,if(sign = 1, mapUpdate(old_data.attr_num_19, new_data.attr_num_19), old_data.attr_num_19) AS attr_num_19
FROM eap_spans_local old_data
GLOBAL JOIN new_data
ON old_data.organization_id = new_data.organization_id
Expand Down

0 comments on commit 87b8e30

Please sign in to comment.