diff --git a/rust_snuba/src/arroyo_utils.rs b/rust_snuba/src/arroyo_utils.rs new file mode 100644 index 0000000000..d8ae013945 --- /dev/null +++ b/rust_snuba/src/arroyo_utils.rs @@ -0,0 +1,17 @@ +use rust_arroyo::processing::strategies::InvalidMessage; +/// Some helper functions that work around Arroyo's ergonomics, and should eventually make it into +/// Arroyo +use rust_arroyo::types::{InnerMessage, Message}; + +pub fn invalid_message_err(message: &Message) -> Result { + let InnerMessage::BrokerMessage(ref msg) = message.inner_message else { + return Err(anyhow::anyhow!("Unexpected message type")); + }; + + let err = InvalidMessage { + partition: msg.partition, + offset: msg.offset, + }; + + Ok(err) +} diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ec3e3d7d19..817c8cf1fe 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -21,6 +21,7 @@ use crate::factory::ConsumerStrategyFactory; use crate::logging::{setup_logging, setup_sentry}; use crate::metrics::global_tags::set_global_tag; use crate::metrics::statsd::StatsDBackend; +use crate::mutations_factory::MutConsumerStrategyFactory; use crate::processors; use crate::types::{InsertOrReplacement, KafkaMessageMetadata}; @@ -38,6 +39,7 @@ pub fn consumer( enforce_schema: bool, max_poll_interval_ms: usize, async_inserts: bool, + mutations_mode: bool, python_max_queue_depth: Option, health_check_file: Option<&str>, stop_at_timestamp: Option, @@ -61,6 +63,7 @@ pub fn consumer( stop_at_timestamp, batch_write_timeout_ms, max_bytes_before_external_group_by, + mutations_mode, ) }); } @@ -82,6 +85,7 @@ pub fn consumer_impl( stop_at_timestamp: Option, batch_write_timeout_ms: Option, max_bytes_before_external_group_by: Option, + mutations_mode: bool, ) -> usize { setup_logging(); @@ -228,33 +232,57 @@ pub fn consumer_impl( None }; - let factory = ConsumerStrategyFactory { - storage_config: first_storage, - env_config, - logical_topic_name, - max_batch_size, - max_batch_time, - processing_concurrency: ConcurrencyConfig::new(concurrency), - clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), - commitlog_concurrency: ConcurrencyConfig::new(2), - replacements_concurrency: ConcurrencyConfig::new(4), - async_inserts, - python_max_queue_depth, - use_rust_processor, - health_check_file: health_check_file.map(ToOwned::to_owned), - enforce_schema, - commit_log_producer, - replacements_config, - physical_consumer_group: consumer_group.to_owned(), - physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), - accountant_topic_config: consumer_config.accountant_topic, - stop_at_timestamp, - batch_write_timeout, - max_bytes_before_external_group_by, - }; - let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name); - let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); + + let processor = if mutations_mode { + let mut_factory = MutConsumerStrategyFactory { + storage_config: first_storage, + env_config, + logical_topic_name, + max_batch_size, + max_batch_time, + processing_concurrency: ConcurrencyConfig::new(concurrency), + clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), + async_inserts, + python_max_queue_depth, + use_rust_processor, + health_check_file: health_check_file.map(ToOwned::to_owned), + enforce_schema, + physical_consumer_group: consumer_group.to_owned(), + physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), + accountant_topic_config: consumer_config.accountant_topic, + batch_write_timeout, + }; + + StreamProcessor::with_kafka(config, mut_factory, topic, dlq_policy) + } else { + let factory = ConsumerStrategyFactory { + storage_config: first_storage, + env_config, + logical_topic_name, + max_batch_size, + max_batch_time, + processing_concurrency: ConcurrencyConfig::new(concurrency), + clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), + commitlog_concurrency: ConcurrencyConfig::new(2), + replacements_concurrency: ConcurrencyConfig::new(4), + async_inserts, + python_max_queue_depth, + use_rust_processor, + health_check_file: health_check_file.map(ToOwned::to_owned), + enforce_schema, + commit_log_producer, + replacements_config, + physical_consumer_group: consumer_group.to_owned(), + physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), + accountant_topic_config: consumer_config.accountant_topic, + stop_at_timestamp, + batch_write_timeout, + max_bytes_before_external_group_by, + }; + + StreamProcessor::with_kafka(config, factory, topic, dlq_policy) + }; let mut handle = processor.get_handle(); diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index 777225b242..e775c85141 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -1,8 +1,11 @@ +mod arroyo_utils; mod config; mod consumer; mod factory; mod logging; mod metrics; +mod mutations; +mod mutations_factory; mod processors; mod runtime_config; mod strategies; @@ -27,6 +30,7 @@ pub use config::{ }; pub use factory::ConsumerStrategyFactory; pub use metrics::statsd::StatsDBackend; +pub use mutations_factory::MutConsumerStrategyFactory; pub use processors::{ProcessingFunction, ProcessingFunctionType, PROCESSORS}; pub use strategies::noop::Noop; pub use strategies::python::PythonTransformStep; diff --git a/rust_snuba/src/mutations/clickhouse.rs b/rust_snuba/src/mutations/clickhouse.rs new file mode 100644 index 0000000000..3561cb9730 --- /dev/null +++ b/rust_snuba/src/mutations/clickhouse.rs @@ -0,0 +1,211 @@ +use std::time::Duration; + +use rust_arroyo::processing::strategies::run_task_in_threads::{ + RunTaskError, RunTaskFunc, TaskRunner, +}; +use rust_arroyo::types::Message; +use serde::Serialize; + +use reqwest::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION}; +use reqwest::{Client, ClientBuilder}; +use uuid::Uuid; + +use crate::mutations::parser::MutationBatch; +use crate::processors::eap_spans::{AttributeMap, PrimaryKey, ATTRS_SHARD_FACTOR}; + +#[derive(Clone)] +pub struct ClickhouseWriter { + url: String, + table: String, + client: Client, +} + +impl ClickhouseWriter { + pub fn new( + hostname: &str, + http_port: u16, + table: &str, + database: &str, + clickhouse_user: &str, + clickhouse_password: &str, + batch_write_timeout: Option, + ) -> Self { + let mut headers = HeaderMap::with_capacity(5); + headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); + headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate")); + headers.insert( + "X-Clickhouse-User", + HeaderValue::from_str(clickhouse_user).unwrap(), + ); + headers.insert( + "X-ClickHouse-Key", + HeaderValue::from_str(clickhouse_password).unwrap(), + ); + headers.insert( + "X-ClickHouse-Database", + HeaderValue::from_str(database).unwrap(), + ); + + let url = format!("http://{hostname}:{http_port}"); + + let mut client_builder = ClientBuilder::new().default_headers(headers); + + if let Some(timeout) = batch_write_timeout { + client_builder = client_builder.timeout(timeout); + } + + Self { + url, + table: table.to_owned(), + client: client_builder.build().unwrap(), + } + } + + async fn process_message(&self, message: &Message) -> anyhow::Result<()> { + let queries = format_query(&self.table, message.payload()); + let session_id = Uuid::new_v4().to_string(); + + for query in queries { + self.client + .post(&self.url) + .query(&[("session_id", &session_id)]) + .body(query) + .send() + .await? + .error_for_status()?; + } + + Ok(()) + } +} + +impl TaskRunner for ClickhouseWriter { + fn get_task(&self, message: Message) -> RunTaskFunc<(), anyhow::Error> { + let slf = self.clone(); + + Box::pin(async move { + slf.process_message(&message) + .await + .map_err(RunTaskError::Other)?; + message.try_map(|_| Ok(())) + }) + } +} + +fn format_query(table: &str, batch: &MutationBatch) -> Vec> { + let mut attr_columns = String::new(); + // attr_combined_columns is intentionally ordered the same as the clickhouse schema. + // INSERT INTO .. SELECT FROM .. matches up columns by position only, and ignores names. + // subqueries don't have this property. + let mut attr_combined_columns = String::new(); + for i in 0..ATTRS_SHARD_FACTOR { + attr_columns.push_str(&format!(",attr_str_{i} Map(String, String)")); + + attr_combined_columns.push_str(&format!( + ",if(sign = 1, mapUpdate(old_data.attr_str_{i}, new_data.attr_str_{i}), old_data.attr_str_{i}) AS attr_str_{i}" + )); + } + + for i in 0..ATTRS_SHARD_FACTOR { + attr_columns.push_str(&format!(",attr_num_{i} Map(String, Float64)")); + attr_combined_columns.push_str(&format!( + ",if(sign = 1, mapUpdate(old_data.attr_num_{i}, new_data.attr_num_{i}), old_data.attr_num_{i}) AS attr_num_{i}" + )); + } + + let input_schema = format!("organization_id UInt64, _sort_timestamp DateTime, trace_id UUID, span_id UInt64 {attr_columns}"); + let create_tmp_table = format!("CREATE TEMPORARY TABLE new_data ({input_schema})").into_bytes(); + let mut insert_tmp_table = "INSERT INTO new_data FORMAT JSONEachRow\n" + .to_owned() + .into_bytes(); + + for (filter, update) in &batch.0 { + let mut attributes = AttributeMap::default(); + for (k, v) in &update.attr_str { + attributes.insert_str(k.clone(), v.clone()); + } + + for (k, v) in &update.attr_num { + attributes.insert_num(k.clone(), *v); + } + + let row = MutationRow { + filter: filter.clone(), + attributes, + }; + + serde_json::to_writer(&mut insert_tmp_table, &row).unwrap(); + insert_tmp_table.push(b'\n'); + } + + let main_insert = format!( + " + INSERT INTO {table} + SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([1, -1]) as sign {attr_combined_columns} + FROM {table} old_data + GLOBAL JOIN new_data + ON old_data.organization_id = new_data.organization_id + and old_data.trace_id = new_data.trace_id + and old_data.span_id = new_data.span_id + and old_data._sort_timestamp = new_data._sort_timestamp + PREWHERE + (old_data.organization_id, + old_data._sort_timestamp, + old_data.trace_id, + old_data.span_id, + ) GLOBAL IN (SELECT organization_id, _sort_timestamp, trace_id, span_id FROM new_data) + WHERE + old_data.sign = 1 + " + ) + .into_bytes(); + + let drop_tmp_table = "DROP TABLE IF EXISTS new_data".to_owned().into_bytes(); + + vec![ + create_tmp_table, + insert_tmp_table, + main_insert, + drop_tmp_table, + ] +} + +#[derive(Serialize, Default)] +struct MutationRow { + #[serde(flatten)] + attributes: AttributeMap, + + #[serde(flatten)] + filter: PrimaryKey, +} + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use crate::mutations::parser::Update; + + use super::*; + + #[test] + fn format_query_snap() { + let mut batch = MutationBatch::default(); + + batch.0.insert( + PrimaryKey { + organization_id: 69, + _sort_timestamp: 1715868485, + trace_id: Uuid::parse_str("deadbeef-dead-beef-dead-beefdeadbeef").unwrap(), + span_id: 16045690984833335023, + }, + Update::default(), + ); + let mut snapshot = String::new(); + for query in format_query("eap_spans_local", &batch) { + snapshot.push_str(std::str::from_utf8(&query).unwrap()); + snapshot.push_str(";\n"); + } + + insta::assert_snapshot!(snapshot); + } +} diff --git a/rust_snuba/src/mutations/mod.rs b/rust_snuba/src/mutations/mod.rs new file mode 100644 index 0000000000..efacfe26d2 --- /dev/null +++ b/rust_snuba/src/mutations/mod.rs @@ -0,0 +1,2 @@ +pub mod clickhouse; +pub mod parser; diff --git a/rust_snuba/src/mutations/parser.rs b/rust_snuba/src/mutations/parser.rs new file mode 100644 index 0000000000..d8f8d4dcea --- /dev/null +++ b/rust_snuba/src/mutations/parser.rs @@ -0,0 +1,84 @@ +use std::collections::BTreeMap; + +use rust_arroyo::backends::kafka::types::KafkaPayload; +use rust_arroyo::counter; +use rust_arroyo::processing::strategies::run_task_in_threads::{ + RunTaskError, RunTaskFunc, TaskRunner, +}; +use rust_arroyo::types::Message; + +use schemars::JsonSchema; +use sentry::Hub; +use sentry::SentryFutureExt; +use serde::Deserialize; + +use crate::arroyo_utils::invalid_message_err; +use crate::processors::eap_spans::PrimaryKey; + +#[derive(Debug, Default, Deserialize, JsonSchema)] +pub(crate) struct Update { + // the update clause + pub attr_str: BTreeMap, + pub attr_num: BTreeMap, +} + +impl Update { + pub fn merge(&mut self, other: Update) { + self.attr_str.extend(other.attr_str); + self.attr_num.extend(other.attr_num); + } +} + +#[derive(Debug, Default, Deserialize, JsonSchema)] +pub(crate) struct MutationMessage { + // primary key, the mutation only applies on the rows that match this filter + pub filter: PrimaryKey, + + // the span attributes to update + pub update: Update, +} + +#[derive(Default)] +pub(crate) struct MutationBatch(pub BTreeMap); + +#[derive(Clone)] +pub(crate) struct MutationParser; + +impl MutationParser { + async fn process_message( + self, + message: Message, + ) -> Result, RunTaskError> { + let maybe_err = RunTaskError::InvalidMessage( + invalid_message_err(&message).map_err(RunTaskError::Other)?, + ); + + message + .try_map(|payload| { + let payload = payload + .payload() + .ok_or(anyhow::anyhow!("no payload in message"))?; + let parsed: MutationMessage = serde_json::from_slice(payload)?; + Ok(parsed) + }) + .map_err(|error: anyhow::Error| { + let error: &dyn std::error::Error = error.as_ref(); + counter!("invalid_mutation"); + tracing::error!(error, "failed processing mutation"); + maybe_err + }) + } +} + +impl TaskRunner for MutationParser { + fn get_task( + &self, + message: Message, + ) -> RunTaskFunc { + Box::pin( + self.clone() + .process_message(message) + .bind_hub(Hub::new_from_top(Hub::current())), + ) + } +} diff --git a/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap b/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap new file mode 100644 index 0000000000..5fdfa367a8 --- /dev/null +++ b/rust_snuba/src/mutations/snapshots/rust_snuba__mutations__clickhouse__tests__format_query_snap.snap @@ -0,0 +1,27 @@ +--- +source: src/mutations/clickhouse.rs +expression: snapshot +--- +CREATE TEMPORARY TABLE new_data (organization_id UInt64, _sort_timestamp DateTime, trace_id UUID, span_id UInt64 ,attr_str_0 Map(String, String),attr_str_1 Map(String, String),attr_str_2 Map(String, String),attr_str_3 Map(String, String),attr_str_4 Map(String, String),attr_str_5 Map(String, String),attr_str_6 Map(String, String),attr_str_7 Map(String, String),attr_str_8 Map(String, String),attr_str_9 Map(String, String),attr_str_10 Map(String, String),attr_str_11 Map(String, String),attr_str_12 Map(String, String),attr_str_13 Map(String, String),attr_str_14 Map(String, String),attr_str_15 Map(String, String),attr_str_16 Map(String, String),attr_str_17 Map(String, String),attr_str_18 Map(String, String),attr_str_19 Map(String, String),attr_num_0 Map(String, Float64),attr_num_1 Map(String, Float64),attr_num_2 Map(String, Float64),attr_num_3 Map(String, Float64),attr_num_4 Map(String, Float64),attr_num_5 Map(String, Float64),attr_num_6 Map(String, Float64),attr_num_7 Map(String, Float64),attr_num_8 Map(String, Float64),attr_num_9 Map(String, Float64),attr_num_10 Map(String, Float64),attr_num_11 Map(String, Float64),attr_num_12 Map(String, Float64),attr_num_13 Map(String, Float64),attr_num_14 Map(String, Float64),attr_num_15 Map(String, Float64),attr_num_16 Map(String, Float64),attr_num_17 Map(String, Float64),attr_num_18 Map(String, Float64),attr_num_19 Map(String, Float64)); +INSERT INTO new_data FORMAT JSONEachRow +{"organization_id":69,"_sort_timestamp":1715868485,"trace_id":"deadbeef-dead-beef-dead-beefdeadbeef","span_id":16045690984833335023} +; + + INSERT INTO eap_spans_local + SELECT old_data.* EXCEPT ('sign|attr_.*'), arrayJoin([1, -1]) as sign ,if(sign = 1, mapUpdate(old_data.attr_str_0, new_data.attr_str_0), old_data.attr_str_0) AS attr_str_0,if(sign = 1, mapUpdate(old_data.attr_str_1, new_data.attr_str_1), old_data.attr_str_1) AS attr_str_1,if(sign = 1, mapUpdate(old_data.attr_str_2, new_data.attr_str_2), old_data.attr_str_2) AS attr_str_2,if(sign = 1, mapUpdate(old_data.attr_str_3, new_data.attr_str_3), old_data.attr_str_3) AS attr_str_3,if(sign = 1, mapUpdate(old_data.attr_str_4, new_data.attr_str_4), old_data.attr_str_4) AS attr_str_4,if(sign = 1, mapUpdate(old_data.attr_str_5, new_data.attr_str_5), old_data.attr_str_5) AS attr_str_5,if(sign = 1, mapUpdate(old_data.attr_str_6, new_data.attr_str_6), old_data.attr_str_6) AS attr_str_6,if(sign = 1, mapUpdate(old_data.attr_str_7, new_data.attr_str_7), old_data.attr_str_7) AS attr_str_7,if(sign = 1, mapUpdate(old_data.attr_str_8, new_data.attr_str_8), old_data.attr_str_8) AS attr_str_8,if(sign = 1, mapUpdate(old_data.attr_str_9, new_data.attr_str_9), old_data.attr_str_9) AS attr_str_9,if(sign = 1, mapUpdate(old_data.attr_str_10, new_data.attr_str_10), old_data.attr_str_10) AS attr_str_10,if(sign = 1, mapUpdate(old_data.attr_str_11, new_data.attr_str_11), old_data.attr_str_11) AS attr_str_11,if(sign = 1, mapUpdate(old_data.attr_str_12, new_data.attr_str_12), old_data.attr_str_12) AS attr_str_12,if(sign = 1, mapUpdate(old_data.attr_str_13, new_data.attr_str_13), old_data.attr_str_13) AS attr_str_13,if(sign = 1, mapUpdate(old_data.attr_str_14, new_data.attr_str_14), old_data.attr_str_14) AS attr_str_14,if(sign = 1, mapUpdate(old_data.attr_str_15, new_data.attr_str_15), old_data.attr_str_15) AS attr_str_15,if(sign = 1, mapUpdate(old_data.attr_str_16, new_data.attr_str_16), old_data.attr_str_16) AS attr_str_16,if(sign = 1, mapUpdate(old_data.attr_str_17, new_data.attr_str_17), old_data.attr_str_17) AS attr_str_17,if(sign = 1, mapUpdate(old_data.attr_str_18, new_data.attr_str_18), old_data.attr_str_18) AS attr_str_18,if(sign = 1, mapUpdate(old_data.attr_str_19, new_data.attr_str_19), old_data.attr_str_19) AS attr_str_19,if(sign = 1, mapUpdate(old_data.attr_num_0, new_data.attr_num_0), old_data.attr_num_0) AS attr_num_0,if(sign = 1, mapUpdate(old_data.attr_num_1, new_data.attr_num_1), old_data.attr_num_1) AS attr_num_1,if(sign = 1, mapUpdate(old_data.attr_num_2, new_data.attr_num_2), old_data.attr_num_2) AS attr_num_2,if(sign = 1, mapUpdate(old_data.attr_num_3, new_data.attr_num_3), old_data.attr_num_3) AS attr_num_3,if(sign = 1, mapUpdate(old_data.attr_num_4, new_data.attr_num_4), old_data.attr_num_4) AS attr_num_4,if(sign = 1, mapUpdate(old_data.attr_num_5, new_data.attr_num_5), old_data.attr_num_5) AS attr_num_5,if(sign = 1, mapUpdate(old_data.attr_num_6, new_data.attr_num_6), old_data.attr_num_6) AS attr_num_6,if(sign = 1, mapUpdate(old_data.attr_num_7, new_data.attr_num_7), old_data.attr_num_7) AS attr_num_7,if(sign = 1, mapUpdate(old_data.attr_num_8, new_data.attr_num_8), old_data.attr_num_8) AS attr_num_8,if(sign = 1, mapUpdate(old_data.attr_num_9, new_data.attr_num_9), old_data.attr_num_9) AS attr_num_9,if(sign = 1, mapUpdate(old_data.attr_num_10, new_data.attr_num_10), old_data.attr_num_10) AS attr_num_10,if(sign = 1, mapUpdate(old_data.attr_num_11, new_data.attr_num_11), old_data.attr_num_11) AS attr_num_11,if(sign = 1, mapUpdate(old_data.attr_num_12, new_data.attr_num_12), old_data.attr_num_12) AS attr_num_12,if(sign = 1, mapUpdate(old_data.attr_num_13, new_data.attr_num_13), old_data.attr_num_13) AS attr_num_13,if(sign = 1, mapUpdate(old_data.attr_num_14, new_data.attr_num_14), old_data.attr_num_14) AS attr_num_14,if(sign = 1, mapUpdate(old_data.attr_num_15, new_data.attr_num_15), old_data.attr_num_15) AS attr_num_15,if(sign = 1, mapUpdate(old_data.attr_num_16, new_data.attr_num_16), old_data.attr_num_16) AS attr_num_16,if(sign = 1, mapUpdate(old_data.attr_num_17, new_data.attr_num_17), old_data.attr_num_17) AS attr_num_17,if(sign = 1, mapUpdate(old_data.attr_num_18, new_data.attr_num_18), old_data.attr_num_18) AS attr_num_18,if(sign = 1, mapUpdate(old_data.attr_num_19, new_data.attr_num_19), old_data.attr_num_19) AS attr_num_19 + FROM eap_spans_local old_data + GLOBAL JOIN new_data + ON old_data.organization_id = new_data.organization_id + and old_data.trace_id = new_data.trace_id + and old_data.span_id = new_data.span_id + and old_data._sort_timestamp = new_data._sort_timestamp + PREWHERE + (old_data.organization_id, + old_data._sort_timestamp, + old_data.trace_id, + old_data.span_id, + ) GLOBAL IN (SELECT organization_id, _sort_timestamp, trace_id, span_id FROM new_data) + WHERE + old_data.sign = 1 + ; +DROP TABLE IF EXISTS new_data; diff --git a/rust_snuba/src/mutations_factory.rs b/rust_snuba/src/mutations_factory.rs new file mode 100644 index 0000000000..cbb2e39a47 --- /dev/null +++ b/rust_snuba/src/mutations_factory.rs @@ -0,0 +1,99 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use rust_arroyo::backends::kafka::types::KafkaPayload; +use rust_arroyo::processing::strategies::commit_offsets::CommitOffsets; +use rust_arroyo::processing::strategies::reduce::Reduce; +use rust_arroyo::processing::strategies::run_task_in_threads::{ + ConcurrencyConfig, RunTaskInThreads, +}; +use rust_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory}; +use rust_arroyo::types::Message; +use rust_arroyo::types::{Partition, Topic}; + +use crate::config; +use crate::metrics::global_tags::set_global_tag; +use crate::mutations::clickhouse::ClickhouseWriter; +use crate::mutations::parser::{MutationBatch, MutationMessage, MutationParser}; + +pub struct MutConsumerStrategyFactory { + pub storage_config: config::StorageConfig, + pub env_config: config::EnvConfig, + pub logical_topic_name: String, + pub max_batch_size: usize, + pub max_batch_time: Duration, + pub processing_concurrency: ConcurrencyConfig, + pub clickhouse_concurrency: ConcurrencyConfig, + pub async_inserts: bool, + pub python_max_queue_depth: Option, + pub use_rust_processor: bool, + pub health_check_file: Option, + pub enforce_schema: bool, + pub physical_consumer_group: String, + pub physical_topic_name: Topic, + pub accountant_topic_config: config::TopicConfig, + pub batch_write_timeout: Option, +} + +impl ProcessingStrategyFactory for MutConsumerStrategyFactory { + fn update_partitions(&self, partitions: &HashMap) { + match partitions.keys().map(|partition| partition.index).min() { + Some(min) => set_global_tag("min_partition".to_owned(), min.to_string()), + None => set_global_tag("min_partition".to_owned(), "none".to_owned()), + } + } + + fn create(&self) -> Box> { + let next_step = CommitOffsets::new(Duration::from_secs(1)); + + let next_step = RunTaskInThreads::new( + Box::new(next_step), + Box::new(ClickhouseWriter::new( + &self.storage_config.clickhouse_cluster.host, + self.storage_config.clickhouse_cluster.http_port, + &self.storage_config.clickhouse_table_name, + &self.storage_config.clickhouse_cluster.database, + &self.storage_config.clickhouse_cluster.user, + &self.storage_config.clickhouse_cluster.password, + self.batch_write_timeout, + )), + &self.clickhouse_concurrency, + Some("clickhouse"), + ); + + let next_step = Reduce::new( + next_step, + Arc::new( + move |mut batch: MutationBatch, message: Message| { + let message = message.into_payload(); + match batch.0.entry(message.filter) { + std::collections::btree_map::Entry::Occupied(mut entry) => { + entry.get_mut().merge(message.update); + } + std::collections::btree_map::Entry::Vacant(entry) => { + entry.insert(message.update); + } + } + + Ok(batch) + }, + ), + Arc::new(MutationBatch::default), + self.max_batch_size, + self.max_batch_time, + // TODO: batch sizes are currently not properly computed. if two mutations are merged + // together, they still count against the batch size as 2. + |_| 1, + ); + + let next_step = RunTaskInThreads::new( + Box::new(next_step), + Box::new(MutationParser), + &self.processing_concurrency, + Some("parse"), + ); + + Box::new(next_step) + } +} diff --git a/rust_snuba/src/processors/eap_spans.rs b/rust_snuba/src/processors/eap_spans.rs index f0d1250b7f..6d09b6ca8d 100644 --- a/rust_snuba/src/processors/eap_spans.rs +++ b/rust_snuba/src/processors/eap_spans.rs @@ -1,11 +1,12 @@ use anyhow::Context; use chrono::DateTime; use seq_macro::seq; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use uuid::Uuid; use rust_arroyo::backends::kafka::types::KafkaPayload; +use schemars::JsonSchema; use serde_json::Value; use crate::config::ProcessorConfig; @@ -13,6 +14,16 @@ use crate::processors::spans::FromSpanMessage; use crate::processors::utils::enforce_retention; use crate::types::{InsertBatch, KafkaMessageMetadata}; +pub const ATTRS_SHARD_FACTOR: usize = 20; + +macro_rules! seq_attrs { + ($($tt:tt)*) => { + seq!(N in 0..20 { + $($tt)* + }); + } +} + pub fn process_message( payload: KafkaPayload, _metadata: KafkaMessageMetadata, @@ -29,21 +40,68 @@ pub fn process_message( InsertBatch::from_rows([span], origin_timestamp) } -seq!(N in 0..20 { +seq_attrs! { +#[derive(Debug, Default, Serialize)] +pub(crate) struct AttributeMap { + #( + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_str_~N: HashMap, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attr_num_~N: HashMap, + )* +} +} + +impl AttributeMap { + pub fn insert_str(&mut self, k: String, v: String) { + seq_attrs! { + let attr_str_buckets = [ + #( + &mut self.attr_str_~N, + )* + ]; + }; + + attr_str_buckets[(fnv_1a(k.as_bytes()) as usize) % attr_str_buckets.len()].insert(k, v); + } + + pub fn insert_num(&mut self, k: String, v: f64) { + seq_attrs! { + let attr_num_buckets = [ + #( + &mut self.attr_num_~N, + )* + ]; + } + + attr_num_buckets[(fnv_1a(k.as_bytes()) as usize) % attr_num_buckets.len()].insert(k, v); + } +} + +#[derive( + Debug, Default, Deserialize, Serialize, JsonSchema, Ord, PartialOrd, Eq, PartialEq, Clone, +)] +pub(crate) struct PrimaryKey { + pub organization_id: u64, + pub _sort_timestamp: u32, + pub trace_id: Uuid, + pub span_id: u64, +} + +/// the span object for the new "events analytics platform" #[derive(Debug, Default, Serialize)] struct EAPSpan { - // the span object for the new "events analytics platform" - organization_id: u64, + #[serde(flatten)] + primary_key: PrimaryKey, + project_id: u64, service: String, //currently just project ID as a string - trace_id: Uuid, - span_id: u64, #[serde(default)] parent_span_id: u64, segment_id: u64, //aka transaction ID segment_name: String, //aka transaction name is_segment: bool, //aka "is transaction" - _sort_timestamp: u32, start_timestamp: u64, end_timestamp: u64, duration_ms: u32, @@ -56,15 +114,9 @@ struct EAPSpan { sampling_weight_2: u64, sign: u8, //1 for additions, -1 for deletions - for this worker it should be 1 - #( - #[serde(skip_serializing_if = "HashMap::is_empty")] - attr_str_~N: HashMap, - - #[serde(skip_serializing_if = "HashMap::is_empty")] - attr_num_~N: HashMap, - )* + #[serde(flatten)] + attributes: AttributeMap, } -}); fn fnv_1a(input: &[u8]) -> u32 { const FNV_1A_PRIME: u32 = 16777619; @@ -82,11 +134,14 @@ fn fnv_1a(input: &[u8]) -> u32 { impl From for EAPSpan { fn from(from: FromSpanMessage) -> EAPSpan { let mut res = Self { - organization_id: from.organization_id, + primary_key: PrimaryKey { + organization_id: from.organization_id, + _sort_timestamp: (from.start_timestamp_ms / 1000) as u32, + trace_id: from.trace_id, + span_id: u64::from_str_radix(&from.span_id, 16).unwrap_or_default(), + }, project_id: from.project_id, - trace_id: from.trace_id, service: from.project_id.to_string(), - span_id: u64::from_str_radix(&from.span_id, 16).unwrap_or_default(), parent_span_id: from .parent_span_id .map_or(0, |s| u64::from_str_radix(&s, 16).unwrap_or(0)), @@ -94,7 +149,6 @@ impl From for EAPSpan { .segment_id .map_or(0, |s| u64::from_str_radix(&s, 16).unwrap_or(0)), is_segment: from.is_segment, - _sort_timestamp: (from.start_timestamp_ms / 1000) as u32, start_timestamp: (from.start_timestamp_precise * 1e6) as u64, end_timestamp: (from.end_timestamp_precise * 1e6) as u64, duration_ms: from.duration_ms, @@ -111,51 +165,30 @@ impl From for EAPSpan { }; { - seq!(N in 0..20 { - let mut attr_str_buckets = [ - #( - &mut res.attr_str_~N, - )* - ]; - let mut attr_num_buckets = [ - #( - &mut res.attr_num_~N, - )* - ]; - }); - - let mut insert_string = |k: String, v: String| { - attr_str_buckets[(fnv_1a(k.as_bytes()) as usize) % attr_str_buckets.len()] - .insert(k.clone(), v.clone()); - }; - - let mut insert_num = |k: String, v: f64| { - attr_num_buckets[(fnv_1a(k.as_bytes()) as usize) % attr_num_buckets.len()] - .insert(k.clone(), v); - }; - if let Some(sentry_tags) = from.sentry_tags { - sentry_tags.iter().for_each(|(k, v)| { + for (k, v) in sentry_tags { if k == "transaction" { - res.segment_name = v.clone(); + res.segment_name = v; } else { - insert_string(format!("sentry.{}", k), v.clone()); + res.attributes.insert_str(format!("sentry.{k}"), v); } - }) + } } if let Some(tags) = from.tags { - tags.iter().for_each(|(k, v)| { - insert_string(k.clone(), v.clone()); - }) + for (k, v) in tags { + res.attributes.insert_str(k, v); + } } if let Some(measurements) = from.measurements { - measurements.iter().for_each(|(k, v)| match k.as_str() { - "client_sample_rate" if v.value > 0.0 => res.sampling_factor *= v.value, - "server_sample_rate" if v.value > 0.0 => res.sampling_factor *= v.value, - _ => insert_num(k.clone(), v.value), - }); + for (k, v) in measurements { + match k.as_str() { + "client_sample_rate" if v.value > 0.0 => res.sampling_factor *= v.value, + "server_sample_rate" if v.value > 0.0 => res.sampling_factor *= v.value, + _ => res.attributes.insert_num(k, v.value), + } + } } // lower precision to compensate floating point errors @@ -164,25 +197,23 @@ impl From for EAPSpan { res.sampling_weight_2 = res.sampling_weight.round() as u64; if let Some(data) = from.data { - data.iter().for_each(|(k, v)| { + for (k, v) in data { match v { - Value::String(string) => insert_string(k.clone(), string.clone()), - Value::Array(array) => insert_string( - k.clone(), - serde_json::to_string(array).unwrap_or_default(), - ), - Value::Object(object) => insert_string( - k.clone(), - serde_json::to_string(object).unwrap_or_default(), - ), - Value::Number(number) => { - insert_num(k.clone(), number.as_f64().unwrap_or_default()) - } - Value::Bool(true) => insert_num(k.clone(), 1.0), - Value::Bool(false) => insert_num(k.clone(), 0.0), - _ => Default::default(), - }; - }) + Value::String(string) => res.attributes.insert_str(k, string), + Value::Array(array) => res + .attributes + .insert_str(k, serde_json::to_string(&array).unwrap_or_default()), + Value::Object(object) => res + .attributes + .insert_str(k, serde_json::to_string(&object).unwrap_or_default()), + Value::Number(number) => res + .attributes + .insert_num(k, number.as_f64().unwrap_or_default()), + Value::Bool(true) => res.attributes.insert_num(k, 1.0), + Value::Bool(false) => res.attributes.insert_num(k, 0.0), + _ => (), + } + } } } diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index 919da00cb5..46c90887c2 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -1,4 +1,4 @@ -mod eap_spans; +pub(crate) mod eap_spans; mod errors; mod functions; mod generic_metrics; diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap index c46d1319ed..cf5e6f9051 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__eap_spans__tests__serialization.snap @@ -3,66 +3,31 @@ source: src/processors/eap_spans.rs expression: span --- { - "organization_id": 1, - "project_id": 1, - "service": "1", - "trace_id": "d099bf9a-d5a1-43cf-8f83-a98081d0ed3b", - "span_id": 9832388815107059821, - "parent_span_id": 0, - "segment_id": 9832388815107059821, - "segment_name": "/api/0/relays/projectconfigs/", - "is_segment": true, "_sort_timestamp": 1721319572, - "start_timestamp": 1721319572616648, - "end_timestamp": 1721319572768806, - "duration_ms": 152, - "exclusive_time_ms": 0.228, - "retention_days": 90, - "name": "/api/0/relays/projectconfigs/", - "sampling_factor": 0.02, - "sampling_weight": 50.0, - "sampling_weight_2": 50, - "sign": 1, - "attr_str_0": { - "relay_protocol_version": "3" - }, - "attr_str_1": { - "sentry.thread.name": "uWSGIWorker1Core0" - }, "attr_num_1": { "my.neg.float.field": -101.2, "my.true.bool.field": 1.0 }, - "attr_str_3": { - "sentry.thread.id": "8522009600" + "attr_num_11": { + "num_of_spans": 50.0 }, - "attr_str_4": { - "thread.id": "8522009600" + "attr_num_14": { + "my.int.field": 2000.0 }, - "attr_str_5": { - "http.status_code": "200", - "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", - "sentry.sdk.name": "sentry.python.django", - "sentry.transaction.op": "http.server" + "attr_num_17": { + "my.float.field": 101.2 }, "attr_num_5": { "my.neg.field": -100.0 }, - "attr_str_6": { - "relay_id": "88888888-4444-4444-8444-cccccccccccc", - "thread.name": "uWSGIWorker1Core0" - }, "attr_num_6": { "my.false.bool.field": 0.0 }, - "attr_str_7": { - "sentry.trace.status": "ok" - }, - "attr_str_8": { - "sentry.category": "http" + "attr_str_0": { + "relay_protocol_version": "3" }, - "attr_str_9": { - "sentry.environment": "development" + "attr_str_1": { + "sentry.thread.name": "uWSGIWorker1Core0" }, "attr_str_10": { "sentry.sdk.version": "2.7.0" @@ -72,9 +37,6 @@ expression: span "sentry.transaction.method": "POST", "sentry.user": "ip:127.0.0.1" }, - "attr_num_11": { - "num_of_spans": 50.0 - }, "attr_str_12": { "relay_use_post_or_schedule_rejected": "version", "server_name": "D23CXQ4GK2.local" @@ -82,23 +44,61 @@ expression: span "attr_str_14": { "sentry.status": "ok" }, - "attr_num_14": { - "my.int.field": 2000.0 - }, "attr_str_17": { "relay_endpoint_version": "3", "relay_no_cache": "False", "relay_use_post_or_schedule": "True", "spans_over_limit": "False" }, - "attr_num_17": { - "my.float.field": 101.2 - }, "attr_str_18": { "sentry.segment.name": "/api/0/relays/projectconfigs/", "sentry.status_code": "200" }, "attr_str_19": { "sentry.op": "http.server" - } + }, + "attr_str_3": { + "sentry.thread.id": "8522009600" + }, + "attr_str_4": { + "thread.id": "8522009600" + }, + "attr_str_5": { + "http.status_code": "200", + "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "sentry.sdk.name": "sentry.python.django", + "sentry.transaction.op": "http.server" + }, + "attr_str_6": { + "relay_id": "88888888-4444-4444-8444-cccccccccccc", + "thread.name": "uWSGIWorker1Core0" + }, + "attr_str_7": { + "sentry.trace.status": "ok" + }, + "attr_str_8": { + "sentry.category": "http" + }, + "attr_str_9": { + "sentry.environment": "development" + }, + "duration_ms": 152, + "end_timestamp": 1721319572768806, + "exclusive_time_ms": 0.228, + "is_segment": true, + "name": "/api/0/relays/projectconfigs/", + "organization_id": 1, + "parent_span_id": 0, + "project_id": 1, + "retention_days": 90, + "sampling_factor": 0.02, + "sampling_weight": 50.0, + "sampling_weight_2": 50, + "segment_id": 9832388815107059821, + "segment_name": "/api/0/relays/projectconfigs/", + "service": "1", + "sign": 1, + "span_id": 9832388815107059821, + "start_timestamp": 1721319572616648, + "trace_id": "d099bf9a-d5a1-43cf-8f83-a98081d0ed3b" } diff --git a/snuba/cli/rust_consumer.py b/snuba/cli/rust_consumer.py index 25f135181c..879f0d011f 100644 --- a/snuba/cli/rust_consumer.py +++ b/snuba/cli/rust_consumer.py @@ -140,6 +140,14 @@ default=False, help="Enable async inserts for ClickHouse", ) +@click.option( + "--mutations-mode", + is_flag=True, + default=False, + help=""" + This is only to be used for the mutability consumer + """, +) @click.option( "--health-check-file", default=None, @@ -203,7 +211,8 @@ def rust_consumer( enforce_schema: bool, stop_at_timestamp: Optional[int], batch_write_timeout_ms: Optional[int], - max_bytes_before_external_group_by: Optional[int] + max_bytes_before_external_group_by: Optional[int], + mutations_mode: bool ) -> None: """ Experimental alternative to `snuba consumer` @@ -250,6 +259,7 @@ def rust_consumer( enforce_schema, max_poll_interval_ms, async_inserts, + mutations_mode, python_max_queue_depth, health_check_file, stop_at_timestamp,