diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index fd296c57d8..9dd949cd90 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -18,9 +18,9 @@ use rust_arroyo::processing::{Callbacks, ConsumerState, RunError, StreamProcesso use rust_arroyo::types::{Partition, Topic}; use rust_arroyo::utils::clock::SystemClock; use rust_snuba::{ - ClickhouseConfig, ConsumerStrategyFactory, EnvConfig, KafkaMessageMetadata, + BrokerConfig, ClickhouseConfig, ConsumerStrategyFactory, EnvConfig, KafkaMessageMetadata, MessageProcessorConfig, ProcessingFunction, ProcessorConfig, StatsDBackend, StorageConfig, - PROCESSORS, + TopicConfig, PROCESSORS, }; use uuid::Uuid; @@ -87,6 +87,11 @@ fn create_factory( None, "test-group".to_owned(), Topic::new("test"), + TopicConfig { + physical_topic_name: "shared-resources-usage".to_string(), + logical_topic_name: "shared-resources-usage".to_string(), + broker_config: BrokerConfig::default(), + }, ); Box::new(factory) } diff --git a/rust_snuba/src/config.rs b/rust_snuba/src/config.rs index 18e418135e..c40f64e9ef 100644 --- a/rust_snuba/src/config.rs +++ b/rust_snuba/src/config.rs @@ -16,6 +16,7 @@ pub struct ConsumerConfig { pub commit_log_topic: Option, pub replacements_topic: Option, pub dlq_topic: Option, + pub accountant_topic: TopicConfig, pub max_batch_size: usize, pub max_batch_time_ms: u64, pub env: EnvConfig, diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 18164188f6..67975e93ba 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -192,6 +192,7 @@ pub fn consumer_impl( commit_log_producer, consumer_group.to_owned(), Topic::new(&consumer_config.raw_topic.physical_topic_name), + consumer_config.accountant_topic, ); let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name); diff --git a/rust_snuba/src/factory.rs b/rust_snuba/src/factory.rs index fb06fc2007..07f5994613 100644 --- a/rust_snuba/src/factory.rs +++ b/rust_snuba/src/factory.rs @@ -21,7 +21,8 @@ use sentry_kafka_schemas::Schema; use crate::config; use crate::metrics::global_tags::set_global_tag; -use crate::processors; +use crate::processors::{self, get_cogs_label}; +use crate::strategies::accountant::RecordCogs; use crate::strategies::clickhouse::ClickhouseWriterStep; use crate::strategies::commit_log::ProduceCommitLog; use crate::strategies::processor::{get_schema, make_rust_processor, validate_schema}; @@ -45,6 +46,7 @@ pub struct ConsumerStrategyFactory { commit_log_producer: Option<(Arc, Topic)>, physical_consumer_group: String, physical_topic_name: Topic, + accountant_topic_config: config::TopicConfig, } impl ConsumerStrategyFactory { @@ -66,6 +68,7 @@ impl ConsumerStrategyFactory { commit_log_producer: Option<(Arc, Topic)>, physical_consumer_group: String, physical_topic_name: Topic, + accountant_topic_config: config::TopicConfig, ) -> Self { Self { storage_config, @@ -84,6 +87,7 @@ impl ConsumerStrategyFactory { commit_log_producer, physical_consumer_group, physical_topic_name, + accountant_topic_config, } } } @@ -97,8 +101,10 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactory { } fn create(&self) -> Box> { + // Commit offsets let next_step = CommitOffsets::new(chrono::Duration::seconds(1)); + // Produce commit log if there is one let next_step: Box> = if let Some((ref producer, destination)) = self.commit_log_producer { Box::new(ProduceCommitLog::new( @@ -114,15 +120,32 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactory { Box::new(next_step) }; + // Write to clickhouse + let next_step = Box::new(ClickhouseWriterStep::new( + next_step, + self.storage_config.clickhouse_cluster.clone(), + self.storage_config.clickhouse_table_name.clone(), + self.skip_write, + &self.clickhouse_concurrency, + )); + + let cogs_label = get_cogs_label(&self.storage_config.message_processor.python_class_name); + + // Produce cogs if generic metrics and we are not skipping writes + let next_step: Box> = + match (self.skip_write, cogs_label) { + (false, Some(resource_id)) => Box::new(RecordCogs::new( + next_step, + resource_id, + self.accountant_topic_config.broker_config.clone(), + &self.accountant_topic_config.physical_topic_name, + )), + _ => next_step, + }; + let accumulator = Arc::new(BytesInsertBatch::merge); let next_step = Reduce::new( - Box::new(ClickhouseWriterStep::new( - next_step, - self.storage_config.clickhouse_cluster.clone(), - self.storage_config.clickhouse_table_name.clone(), - self.skip_write, - &self.clickhouse_concurrency, - )), + next_step, accumulator, BytesInsertBatch::default(), self.max_batch_size, diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index ff8f873a8c..e3cdd7c6a9 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -22,7 +22,8 @@ fn rust_snuba(_py: Python<'_>, m: &PyModule) -> PyResult<()> { // Ideally, we would have a normal rust crate we can use in examples and benchmarks, // plus a pyo3 specific crate as `cdylib`. pub use config::{ - ClickhouseConfig, EnvConfig, MessageProcessorConfig, ProcessorConfig, StorageConfig, + BrokerConfig, ClickhouseConfig, EnvConfig, MessageProcessorConfig, ProcessorConfig, + StorageConfig, TopicConfig, }; pub use factory::ConsumerStrategyFactory; pub use metrics::statsd::StatsDBackend; diff --git a/rust_snuba/src/processors/functions.rs b/rust_snuba/src/processors/functions.rs index 612786805f..a12bf47fec 100644 --- a/rust_snuba/src/processors/functions.rs +++ b/rust_snuba/src/processors/functions.rs @@ -5,7 +5,7 @@ use rust_arroyo::backends::kafka::types::KafkaPayload; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::types::{InsertBatch, KafkaMessageMetadata, RowData}; +use crate::types::{InsertBatch, KafkaMessageMetadata}; pub fn process_message( payload: KafkaPayload, @@ -15,35 +15,35 @@ pub fn process_message( let payload_bytes = payload.payload().context("Expected payload")?; let msg: InputMessage = serde_json::from_slice(payload_bytes)?; - let functions = msg.functions.iter().map(|from| { - Function { - profile_id: msg.profile_id, - project_id: msg.project_id, + let functions: Vec = msg + .functions + .iter() + .map(|from| { + Function { + profile_id: msg.profile_id, + project_id: msg.project_id, - // Profile metadata - environment: msg.environment.as_deref(), - platform: &msg.platform, - release: msg.release.as_deref(), - retention_days: msg.retention_days, - timestamp: msg.timestamp, - transaction_name: &msg.transaction_name, + // Profile metadata + environment: msg.environment.as_deref(), + platform: &msg.platform, + release: msg.release.as_deref(), + retention_days: msg.retention_days, + timestamp: msg.timestamp, + transaction_name: &msg.transaction_name, - // Function metadata - fingerprint: from.fingerprint, - durations: &from.self_times_ns, - package: &from.package, - name: &from.function, - is_application: from.in_app as u8, + // Function metadata + fingerprint: from.fingerprint, + durations: &from.self_times_ns, + package: &from.package, + name: &from.function, + is_application: from.in_app as u8, - ..Default::default() - } - }); + ..Default::default() + } + }) + .collect(); - Ok(InsertBatch { - origin_timestamp: DateTime::from_timestamp(msg.received, 0), - rows: RowData::from_rows(functions)?, - sentry_received_timestamp: None, - }) + InsertBatch::from_rows(functions, DateTime::from_timestamp(msg.received, 0)) } #[derive(Debug, Deserialize)] diff --git a/rust_snuba/src/processors/generic_metrics.rs b/rust_snuba/src/processors/generic_metrics.rs index 2196bb811c..0f0b16eef8 100644 --- a/rust_snuba/src/processors/generic_metrics.rs +++ b/rust_snuba/src/processors/generic_metrics.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, vec}; use crate::{ - types::{InsertBatch, RowData}, + types::{CogsData, InsertBatch, RowData}, KafkaMessageMetadata, ProcessorConfig, }; @@ -181,10 +181,12 @@ where { let payload_bytes = payload.payload().context("Expected payload")?; let msg: FromGenericMetricsMessage = serde_json::from_slice(payload_bytes)?; + let use_case_id = msg.use_case_id.clone(); let sentry_received_timestamp = DateTime::from_timestamp(msg.sentry_received_timestamp as i64, 0); let result: Result, anyhow::Error> = T::parse(msg, config); + match result { Ok(row) => { if let Some(row) = row { @@ -192,6 +194,12 @@ where rows: RowData::from_rows([row])?, origin_timestamp: None, sentry_received_timestamp, + cogs_data: Some(CogsData { + data: BTreeMap::from([( + format!("genericmetrics_{use_case_id}"), + payload_bytes.len() as u64, + )]), + }), }) } else { Ok(InsertBatch::skip()) @@ -611,6 +619,9 @@ mod tests { rows: RowData::from_rows([expected_row]).unwrap(), origin_timestamp: None, sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), + cogs_data: Some(CogsData { + data: BTreeMap::from([("genericmetrics_spans".to_string(), 615)]) + }), } ); } @@ -678,6 +689,9 @@ mod tests { rows: RowData::from_rows([expected_row]).unwrap(), origin_timestamp: None, sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), + cogs_data: Some(CogsData { + data: BTreeMap::from([("genericmetrics_spans".to_string(), 622)]) + }), } ); } @@ -746,6 +760,9 @@ mod tests { rows: RowData::from_rows([expected_row]).unwrap(), origin_timestamp: None, sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), + cogs_data: Some(CogsData { + data: BTreeMap::from([("genericmetrics_spans".to_string(), 629)]) + }) } ); } @@ -799,6 +816,9 @@ mod tests { rows: RowData::from_rows([expected_row]).unwrap(), origin_timestamp: None, sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), + cogs_data: Some(CogsData { + data: BTreeMap::from([("genericmetrics_spans".to_string(), 667)]) + }) } ); } @@ -870,6 +890,9 @@ mod tests { rows: RowData::from_rows([expected_row]).unwrap(), origin_timestamp: None, sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), + cogs_data: Some(CogsData { + data: BTreeMap::from([("genericmetrics_spans".to_string(), 679)]) + }) } ); } @@ -927,6 +950,9 @@ mod tests { rows: RowData::from_rows([expected_row]).unwrap(), origin_timestamp: None, sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), + cogs_data: Some(CogsData { + data: BTreeMap::from([("genericmetrics_spans".to_string(), 719)]) + }) } ); } diff --git a/rust_snuba/src/processors/metrics_summaries.rs b/rust_snuba/src/processors/metrics_summaries.rs index 8cc0dc8769..6b97d0d1cf 100644 --- a/rust_snuba/src/processors/metrics_summaries.rs +++ b/rust_snuba/src/processors/metrics_summaries.rs @@ -8,7 +8,7 @@ use rust_arroyo::backends::kafka::types::KafkaPayload; use crate::config::ProcessorConfig; use crate::processors::utils::{enforce_retention, hex_to_u64}; -use crate::types::{InsertBatch, KafkaMessageMetadata, RowData}; +use crate::types::{InsertBatch, KafkaMessageMetadata}; pub fn process_message( payload: KafkaPayload, @@ -49,11 +49,7 @@ pub fn process_message( let origin_timestamp = DateTime::from_timestamp(from.received as i64, 0); - Ok(InsertBatch { - origin_timestamp, - rows: RowData::from_rows(metrics_summaries)?, - sentry_received_timestamp: None, - }) + InsertBatch::from_rows(metrics_summaries, origin_timestamp) } #[derive(Debug, Default, Deserialize)] diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index 00e8795a61..b6b519880b 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -48,6 +48,19 @@ define_processing_functions! { ("GenericGaugesMetricsProcessor", "snuba-generic-metrics", generic_metrics::process_gauge_message), } +// COGS is recorded for these processors +pub fn get_cogs_label(processor_name: &str) -> Option { + match processor_name { + "GenericCountersMetricsProcessor" => Some("generic_metrics_processor_counters".to_string()), + "GenericSetsMetricsProcessor" => Some("generic_metrics_processor_sets".to_string()), + "GenericDistributionsMetricsProcessor" => { + Some("generic_metrics_processor_distributions".to_string()) + } + "GenericGaugesMetricsProcessor" => Some("generic_metrics_processor_gauges".to_string()), + _ => None, + } +} + #[cfg(test)] mod tests { use std::time::SystemTime; diff --git a/rust_snuba/src/processors/outcomes.rs b/rust_snuba/src/processors/outcomes.rs index 285f175003..2eb77185c0 100644 --- a/rust_snuba/src/processors/outcomes.rs +++ b/rust_snuba/src/processors/outcomes.rs @@ -63,7 +63,7 @@ pub fn process_message( } } - InsertBatch::from_rows([msg]) + InsertBatch::from_rows([msg], None) } #[derive(Debug, Deserialize, Serialize)] diff --git a/rust_snuba/src/processors/profiles.rs b/rust_snuba/src/processors/profiles.rs index c8aba57f80..77c6af9d26 100644 --- a/rust_snuba/src/processors/profiles.rs +++ b/rust_snuba/src/processors/profiles.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::processors::utils::enforce_retention; -use crate::types::{InsertBatch, KafkaMessageMetadata, RowData}; +use crate::types::{InsertBatch, KafkaMessageMetadata}; pub fn process_message( payload: KafkaPayload, @@ -22,11 +22,7 @@ pub fn process_message( let origin_timestamp = DateTime::from_timestamp(msg.received, 0); - Ok(InsertBatch { - origin_timestamp, - rows: RowData::from_rows([msg])?, - sentry_received_timestamp: None, - }) + InsertBatch::from_rows([msg], origin_timestamp) } #[derive(Debug, Deserialize, Serialize)] diff --git a/rust_snuba/src/processors/querylog.rs b/rust_snuba/src/processors/querylog.rs index 9f58c6a4f0..6ed7817516 100644 --- a/rust_snuba/src/processors/querylog.rs +++ b/rust_snuba/src/processors/querylog.rs @@ -31,7 +31,7 @@ pub fn process_message( offset: metadata.offset, }; - InsertBatch::from_rows([querylog_msg]) + InsertBatch::from_rows([querylog_msg], None) } #[derive(Debug, Deserialize, Serialize, JsonSchema)] diff --git a/rust_snuba/src/processors/replays.rs b/rust_snuba/src/processors/replays.rs index 0b557b68a5..5606380711 100644 --- a/rust_snuba/src/processors/replays.rs +++ b/rust_snuba/src/processors/replays.rs @@ -14,7 +14,7 @@ pub fn process_message( ) -> anyhow::Result { let payload_bytes = payload.payload().context("Expected payload")?; let replay_row = deserialize_message(payload_bytes, metadata.partition, metadata.offset)?; - InsertBatch::from_rows(replay_row) + InsertBatch::from_rows(replay_row, None) } pub fn deserialize_message( diff --git a/rust_snuba/src/processors/spans.rs b/rust_snuba/src/processors/spans.rs index cced0a29f6..8db273ac5b 100644 --- a/rust_snuba/src/processors/spans.rs +++ b/rust_snuba/src/processors/spans.rs @@ -10,7 +10,7 @@ use rust_arroyo::backends::kafka::types::KafkaPayload; use crate::config::ProcessorConfig; use crate::processors::utils::{enforce_retention, hex_to_u64}; -use crate::types::{InsertBatch, KafkaMessageMetadata, RowData}; +use crate::types::{InsertBatch, KafkaMessageMetadata}; pub fn process_message( payload: KafkaPayload, @@ -27,11 +27,7 @@ pub fn process_message( span.offset = metadata.offset; span.partition = metadata.partition; - Ok(InsertBatch { - origin_timestamp, - rows: RowData::from_rows([span])?, - sentry_received_timestamp: None, - }) + InsertBatch::from_rows([span], origin_timestamp) } #[derive(Debug, Default, Deserialize)] diff --git a/rust_snuba/src/processors/utils.rs b/rust_snuba/src/processors/utils.rs index 426c9e7667..566acaa711 100644 --- a/rust_snuba/src/processors/utils.rs +++ b/rust_snuba/src/processors/utils.rs @@ -1,8 +1,6 @@ use crate::config::EnvConfig; use chrono::{DateTime, NaiveDateTime, Utc}; -use sentry_usage_accountant::{KafkaConfig, KafkaProducer, UsageAccountant, UsageUnit}; use serde::{Deserialize, Deserializer}; -use std::collections::HashMap; // Equivalent to "%Y-%m-%dT%H:%M:%S.%fZ" in python pub const PAYLOAD_DATETIME_FORMAT: &str = "%Y-%m-%dT%H:%M:%S.%fZ"; @@ -41,50 +39,3 @@ where }; Ok(seconds_since_epoch.timestamp() as u32) } - -pub struct CogsAccountant { - accountant: UsageAccountant, - // We only log a warning once if there was an error recording cogs. Once this is true, we no longer record. - logged_warning: bool, -} - -impl CogsAccountant { - #[allow(dead_code)] - pub fn new(broker_config: HashMap, topic_name: &str) -> Self { - let config = KafkaConfig::new_producer_config(broker_config); - - Self { - accountant: UsageAccountant::new_with_kafka(config, Some(topic_name), None), - logged_warning: false, - } - } - - #[allow(dead_code)] - pub fn record_bytes(&mut self, resource_id: &str, app_feature: &str, amount_bytes: u64) { - if let Err(err) = - self.accountant - .record(resource_id, app_feature, amount_bytes, UsageUnit::Bytes) - { - if !self.logged_warning { - tracing::warn!(?err, "error recording cogs"); - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_record_cogs() { - let mut accountant = CogsAccountant::new( - HashMap::from([( - "bootstrap.servers".to_string(), - "127.0.0.1:9092".to_string(), - )]), - "shared-resources-usage", - ); - accountant.record_bytes("generic_metrics_processor_sets", "custom", 100) - } -} diff --git a/rust_snuba/src/strategies/accountant.rs b/rust_snuba/src/strategies/accountant.rs new file mode 100644 index 0000000000..a10fcd61d0 --- /dev/null +++ b/rust_snuba/src/strategies/accountant.rs @@ -0,0 +1,116 @@ +use std::collections::HashMap; + +use sentry_usage_accountant::{KafkaConfig, KafkaProducer, UsageAccountant, UsageUnit}; +use std::time::Duration; + +use crate::types::BytesInsertBatch; +use rust_arroyo::processing::strategies::{ + CommitRequest, ProcessingStrategy, StrategyError, SubmitError, +}; +use rust_arroyo::types::Message; + +pub struct CogsAccountant { + accountant: UsageAccountant, + // We only log a warning once if there was an error recording cogs. Once this is true, we no longer record. + logged_warning: bool, +} + +impl CogsAccountant { + pub fn new(broker_config: HashMap, topic_name: &str) -> Self { + let config = KafkaConfig::new_producer_config(broker_config); + + Self { + accountant: UsageAccountant::new_with_kafka(config, Some(topic_name), None), + logged_warning: false, + } + } + + pub fn record_bytes(&mut self, resource_id: &str, app_feature: &str, amount_bytes: u64) { + if let Err(err) = + self.accountant + .record(resource_id, app_feature, amount_bytes, UsageUnit::Bytes) + { + if !self.logged_warning { + tracing::warn!(?err, "error recording cogs"); + } + } + } +} + +pub struct RecordCogs { + next_step: N, + resource_id: String, + accountant: CogsAccountant, +} + +impl RecordCogs { + pub fn new( + next_step: N, + resource_id: String, + broker_config: HashMap, + topic_name: &str, + ) -> Self + where + N: ProcessingStrategy + 'static, + { + let accountant = CogsAccountant::new(broker_config, topic_name); + + Self { + next_step, + resource_id, + accountant, + } + } +} + +impl ProcessingStrategy for RecordCogs +where + N: ProcessingStrategy + 'static, +{ + fn poll(&mut self) -> Result, StrategyError> { + self.next_step.poll() + } + + fn submit( + &mut self, + message: Message, + ) -> Result<(), SubmitError> { + if let Some(cogs_data) = message.payload().cogs_data() { + for (app_feature, amount_bytes) in cogs_data.data.iter() { + self.accountant + .record_bytes(&self.resource_id, app_feature, *amount_bytes) + } + } + + self.next_step.submit(message) + } + + fn close(&mut self) { + self.next_step.close() + } + + fn terminate(&mut self) { + self.next_step.terminate() + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + self.next_step.join(timeout) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_record_cogs() { + let mut accountant = CogsAccountant::new( + HashMap::from([( + "bootstrap.servers".to_string(), + "127.0.0.1:9092".to_string(), + )]), + "shared-resources-usage", + ); + accountant.record_bytes("generic_metrics_processor_sets", "custom", 100) + } +} diff --git a/rust_snuba/src/strategies/commit_log.rs b/rust_snuba/src/strategies/commit_log.rs index 96138aa4ff..8fe81ee2d0 100644 --- a/rust_snuba/src/strategies/commit_log.rs +++ b/rust_snuba/src/strategies/commit_log.rs @@ -302,6 +302,7 @@ mod tests { None, None, BTreeMap::from([(0, (500, Utc::now()))]), + None, ), BytesInsertBatch::new( RowData::default(), @@ -309,6 +310,7 @@ mod tests { None, None, BTreeMap::from([(0, (600, Utc::now())), (1, (100, Utc::now()))]), + None, ), ]; diff --git a/rust_snuba/src/strategies/mod.rs b/rust_snuba/src/strategies/mod.rs index fa4c22cd13..70a1f175e0 100644 --- a/rust_snuba/src/strategies/mod.rs +++ b/rust_snuba/src/strategies/mod.rs @@ -1,3 +1,4 @@ +pub mod accountant; pub mod clickhouse; pub mod commit_log; pub mod noop; diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index babbd654be..29f6fd1c56 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -131,6 +131,7 @@ impl MessageProcessor { transformed.origin_timestamp, transformed.sentry_received_timestamp, BTreeMap::from([(msg.partition.index, (msg.offset, msg.timestamp))]), + transformed.cogs_data, ); Ok(Message::new_broker_message( payload, diff --git a/rust_snuba/src/strategies/python.rs b/rust_snuba/src/strategies/python.rs index 9bd968058c..408b5ce199 100644 --- a/rust_snuba/src/strategies/python.rs +++ b/rust_snuba/src/strategies/python.rs @@ -81,6 +81,7 @@ impl PythonTransformStep { origin_timestamp, sentry_received_timestamp, commit_log_offsets, + None, ); let mut committable: BTreeMap = BTreeMap::new(); diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index 38389f2906..cf899a6b53 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -7,6 +7,28 @@ use serde::{Deserialize, Serialize}; pub type CommitLogOffsets = BTreeMap)>; +#[derive(Clone, Debug, PartialEq)] +pub struct CogsData { + pub data: BTreeMap, // app_feature: bytes_len +} + +impl CogsData { + fn merge(mut self, other: Option) -> Self { + match other { + None => self, + Some(data) => { + for (k, v) in data.data { + self.data + .entry(k) + .and_modify(|curr| *curr += v) + .or_insert(v); + } + self + } + } + } +} + #[derive(Debug, Clone)] struct LatencyRecorder { sum_timestamps: f64, @@ -70,28 +92,28 @@ impl LatencyRecorder { } /// The return value of message processors. -/// -/// NOTE: In Python, this struct crosses a serialization boundary, and so this struct is somewhat -/// sensitive to serialization speed. If there are additional things that should be returned from -/// the Rust message processor that are not necessary in Python, it's probably best to duplicate -/// this struct for Python as there it can be an internal type. -#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Default, PartialEq)] pub struct InsertBatch { pub rows: RowData, pub origin_timestamp: Option>, pub sentry_received_timestamp: Option>, + pub cogs_data: Option, } impl InsertBatch { - pub fn from_rows(rows: impl IntoIterator) -> anyhow::Result + pub fn from_rows( + rows: impl IntoIterator, + origin_timestamp: Option>, + ) -> anyhow::Result where T: Serialize, { let rows = RowData::from_rows(rows)?; Ok(Self { rows, - origin_timestamp: None, + origin_timestamp, sentry_received_timestamp: None, + cogs_data: None, }) } @@ -127,6 +149,8 @@ pub struct BytesInsertBatch { // For each partition we store the offset and timestamp to be produced to the commit log commit_log_offsets: CommitLogOffsets, + + cogs_data: Option, } impl BytesInsertBatch { @@ -136,6 +160,7 @@ impl BytesInsertBatch { origin_timestamp: Option>, sentry_received_timestamp: Option>, commit_log_offsets: CommitLogOffsets, + cogs_data: Option, ) -> Self { BytesInsertBatch { rows, @@ -147,6 +172,7 @@ impl BytesInsertBatch { .map(LatencyRecorder::from) .unwrap_or_default(), commit_log_offsets, + cogs_data, } } @@ -160,6 +186,10 @@ impl BytesInsertBatch { self.origin_timestamp.merge(other.origin_timestamp); self.sentry_received_timestamp .merge(other.sentry_received_timestamp); + self.cogs_data = match self.cogs_data { + Some(cogs_data) => Some(cogs_data.merge(other.cogs_data)), + None => other.cogs_data, + }; self } @@ -184,6 +214,10 @@ impl BytesInsertBatch { pub fn commit_log_offsets(&self) -> &CommitLogOffsets { &self.commit_log_offsets } + + pub fn cogs_data(&self) -> Option<&CogsData> { + self.cogs_data.as_ref() + } } #[derive(Clone, Debug, Deserialize, Serialize, Default, PartialEq)] @@ -201,6 +235,7 @@ impl RowData { let mut num_rows = 0; for row in rows { serde_json::to_writer(&mut encoded_rows, &row)?; + debug_assert!(encoded_rows.ends_with(b"}")); encoded_rows.push(b'\n'); num_rows += 1; } diff --git a/snuba/consumers/consumer_config.py b/snuba/consumers/consumer_config.py index e0f4d2a2f1..406edb28d5 100644 --- a/snuba/consumers/consumer_config.py +++ b/snuba/consumers/consumer_config.py @@ -8,6 +8,7 @@ from snuba.datasets.storages.storage_key import StorageKey from snuba.datasets.table_storage import KafkaTopicSpec from snuba.utils.streams.configuration_builder import _get_default_topic_configuration +from snuba.utils.streams.topics import Topic @dataclass(frozen=True) @@ -65,6 +66,7 @@ class ConsumerConfig: max_batch_size: int max_batch_time_ms: int env: Optional[EnvConfig] + accountant_topic: TopicConfig def _add_to_topic_broker_config( @@ -224,6 +226,15 @@ def resolve_consumer_config( None, slice_id, ) + + accountant_topic = _resolve_topic_config( + "accountant topic", + KafkaTopicSpec(Topic.COGS_SHARED_RESOURCES_USAGE), + None, + slice_id, + ) + assert accountant_topic is not None + return ConsumerConfig( storages=[ resolve_storage_config(storage_name, storage) @@ -236,6 +247,7 @@ def resolve_consumer_config( max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, env=resolved_env_config, + accountant_topic=accountant_topic, )