Skip to content

Commit

Permalink
feat(accountant): Add accountant into processing (#5398)
Browse files Browse the repository at this point in the history
COGs recording for generic metrics
  • Loading branch information
nikhars authored Jan 18, 2024
1 parent 03a82d5 commit 8d35a12
Show file tree
Hide file tree
Showing 22 changed files with 293 additions and 116 deletions.
9 changes: 7 additions & 2 deletions rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use rust_arroyo::processing::{Callbacks, ConsumerState, RunError, StreamProcesso
use rust_arroyo::types::{Partition, Topic};
use rust_arroyo::utils::clock::SystemClock;
use rust_snuba::{
ClickhouseConfig, ConsumerStrategyFactory, EnvConfig, KafkaMessageMetadata,
BrokerConfig, ClickhouseConfig, ConsumerStrategyFactory, EnvConfig, KafkaMessageMetadata,
MessageProcessorConfig, ProcessingFunction, ProcessorConfig, StatsDBackend, StorageConfig,
PROCESSORS,
TopicConfig, PROCESSORS,
};
use uuid::Uuid;

Expand Down Expand Up @@ -87,6 +87,11 @@ fn create_factory(
None,
"test-group".to_owned(),
Topic::new("test"),
TopicConfig {
physical_topic_name: "shared-resources-usage".to_string(),
logical_topic_name: "shared-resources-usage".to_string(),
broker_config: BrokerConfig::default(),
},
);
Box::new(factory)
}
Expand Down
1 change: 1 addition & 0 deletions rust_snuba/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct ConsumerConfig {
pub commit_log_topic: Option<TopicConfig>,
pub replacements_topic: Option<TopicConfig>,
pub dlq_topic: Option<TopicConfig>,
pub accountant_topic: TopicConfig,
pub max_batch_size: usize,
pub max_batch_time_ms: u64,
pub env: EnvConfig,
Expand Down
1 change: 1 addition & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub fn consumer_impl(
commit_log_producer,
consumer_group.to_owned(),
Topic::new(&consumer_config.raw_topic.physical_topic_name),
consumer_config.accountant_topic,
);

let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name);
Expand Down
39 changes: 31 additions & 8 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use sentry_kafka_schemas::Schema;

use crate::config;
use crate::metrics::global_tags::set_global_tag;
use crate::processors;
use crate::processors::{self, get_cogs_label};
use crate::strategies::accountant::RecordCogs;
use crate::strategies::clickhouse::ClickhouseWriterStep;
use crate::strategies::commit_log::ProduceCommitLog;
use crate::strategies::processor::{get_schema, make_rust_processor, validate_schema};
Expand All @@ -45,6 +46,7 @@ pub struct ConsumerStrategyFactory {
commit_log_producer: Option<(Arc<KafkaProducer>, Topic)>,
physical_consumer_group: String,
physical_topic_name: Topic,
accountant_topic_config: config::TopicConfig,
}

impl ConsumerStrategyFactory {
Expand All @@ -66,6 +68,7 @@ impl ConsumerStrategyFactory {
commit_log_producer: Option<(Arc<KafkaProducer>, Topic)>,
physical_consumer_group: String,
physical_topic_name: Topic,
accountant_topic_config: config::TopicConfig,
) -> Self {
Self {
storage_config,
Expand All @@ -84,6 +87,7 @@ impl ConsumerStrategyFactory {
commit_log_producer,
physical_consumer_group,
physical_topic_name,
accountant_topic_config,
}
}
}
Expand All @@ -97,8 +101,10 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
}

fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
// Commit offsets
let next_step = CommitOffsets::new(chrono::Duration::seconds(1));

// Produce commit log if there is one
let next_step: Box<dyn ProcessingStrategy<_>> =
if let Some((ref producer, destination)) = self.commit_log_producer {
Box::new(ProduceCommitLog::new(
Expand All @@ -114,15 +120,32 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
Box::new(next_step)
};

// Write to clickhouse
let next_step = Box::new(ClickhouseWriterStep::new(
next_step,
self.storage_config.clickhouse_cluster.clone(),
self.storage_config.clickhouse_table_name.clone(),
self.skip_write,
&self.clickhouse_concurrency,
));

let cogs_label = get_cogs_label(&self.storage_config.message_processor.python_class_name);

// Produce cogs if generic metrics and we are not skipping writes
let next_step: Box<dyn ProcessingStrategy<BytesInsertBatch>> =
match (self.skip_write, cogs_label) {
(false, Some(resource_id)) => Box::new(RecordCogs::new(
next_step,
resource_id,
self.accountant_topic_config.broker_config.clone(),
&self.accountant_topic_config.physical_topic_name,
)),
_ => next_step,
};

let accumulator = Arc::new(BytesInsertBatch::merge);
let next_step = Reduce::new(
Box::new(ClickhouseWriterStep::new(
next_step,
self.storage_config.clickhouse_cluster.clone(),
self.storage_config.clickhouse_table_name.clone(),
self.skip_write,
&self.clickhouse_concurrency,
)),
next_step,
accumulator,
BytesInsertBatch::default(),
self.max_batch_size,
Expand Down
3 changes: 2 additions & 1 deletion rust_snuba/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ fn rust_snuba(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
// Ideally, we would have a normal rust crate we can use in examples and benchmarks,
// plus a pyo3 specific crate as `cdylib`.
pub use config::{
ClickhouseConfig, EnvConfig, MessageProcessorConfig, ProcessorConfig, StorageConfig,
BrokerConfig, ClickhouseConfig, EnvConfig, MessageProcessorConfig, ProcessorConfig,
StorageConfig, TopicConfig,
};
pub use factory::ConsumerStrategyFactory;
pub use metrics::statsd::StatsDBackend;
Expand Down
52 changes: 26 additions & 26 deletions rust_snuba/src/processors/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rust_arroyo::backends::kafka::types::KafkaPayload;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::types::{InsertBatch, KafkaMessageMetadata, RowData};
use crate::types::{InsertBatch, KafkaMessageMetadata};

pub fn process_message(
payload: KafkaPayload,
Expand All @@ -15,35 +15,35 @@ pub fn process_message(
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: InputMessage = serde_json::from_slice(payload_bytes)?;

let functions = msg.functions.iter().map(|from| {
Function {
profile_id: msg.profile_id,
project_id: msg.project_id,
let functions: Vec<Function> = msg
.functions
.iter()
.map(|from| {
Function {
profile_id: msg.profile_id,
project_id: msg.project_id,

// Profile metadata
environment: msg.environment.as_deref(),
platform: &msg.platform,
release: msg.release.as_deref(),
retention_days: msg.retention_days,
timestamp: msg.timestamp,
transaction_name: &msg.transaction_name,
// Profile metadata
environment: msg.environment.as_deref(),
platform: &msg.platform,
release: msg.release.as_deref(),
retention_days: msg.retention_days,
timestamp: msg.timestamp,
transaction_name: &msg.transaction_name,

// Function metadata
fingerprint: from.fingerprint,
durations: &from.self_times_ns,
package: &from.package,
name: &from.function,
is_application: from.in_app as u8,
// Function metadata
fingerprint: from.fingerprint,
durations: &from.self_times_ns,
package: &from.package,
name: &from.function,
is_application: from.in_app as u8,

..Default::default()
}
});
..Default::default()
}
})
.collect();

Ok(InsertBatch {
origin_timestamp: DateTime::from_timestamp(msg.received, 0),
rows: RowData::from_rows(functions)?,
sentry_received_timestamp: None,
})
InsertBatch::from_rows(functions, DateTime::from_timestamp(msg.received, 0))
}

#[derive(Debug, Deserialize)]
Expand Down
28 changes: 27 additions & 1 deletion rust_snuba/src/processors/generic_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, vec};

use crate::{
types::{InsertBatch, RowData},
types::{CogsData, InsertBatch, RowData},
KafkaMessageMetadata, ProcessorConfig,
};

Expand Down Expand Up @@ -181,17 +181,25 @@ where
{
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: FromGenericMetricsMessage = serde_json::from_slice(payload_bytes)?;
let use_case_id = msg.use_case_id.clone();
let sentry_received_timestamp =
DateTime::from_timestamp(msg.sentry_received_timestamp as i64, 0);

let result: Result<Option<T>, anyhow::Error> = T::parse(msg, config);

match result {
Ok(row) => {
if let Some(row) = row {
Ok(InsertBatch {
rows: RowData::from_rows([row])?,
origin_timestamp: None,
sentry_received_timestamp,
cogs_data: Some(CogsData {
data: BTreeMap::from([(
format!("genericmetrics_{use_case_id}"),
payload_bytes.len() as u64,
)]),
}),
})
} else {
Ok(InsertBatch::skip())
Expand Down Expand Up @@ -611,6 +619,9 @@ mod tests {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 615)])
}),
}
);
}
Expand Down Expand Up @@ -678,6 +689,9 @@ mod tests {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 622)])
}),
}
);
}
Expand Down Expand Up @@ -746,6 +760,9 @@ mod tests {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 629)])
})
}
);
}
Expand Down Expand Up @@ -799,6 +816,9 @@ mod tests {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 667)])
})
}
);
}
Expand Down Expand Up @@ -870,6 +890,9 @@ mod tests {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 679)])
})
}
);
}
Expand Down Expand Up @@ -927,6 +950,9 @@ mod tests {
rows: RowData::from_rows([expected_row]).unwrap(),
origin_timestamp: None,
sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0),
cogs_data: Some(CogsData {
data: BTreeMap::from([("genericmetrics_spans".to_string(), 719)])
})
}
);
}
Expand Down
8 changes: 2 additions & 6 deletions rust_snuba/src/processors/metrics_summaries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rust_arroyo::backends::kafka::types::KafkaPayload;

use crate::config::ProcessorConfig;
use crate::processors::utils::{enforce_retention, hex_to_u64};
use crate::types::{InsertBatch, KafkaMessageMetadata, RowData};
use crate::types::{InsertBatch, KafkaMessageMetadata};

pub fn process_message(
payload: KafkaPayload,
Expand Down Expand Up @@ -49,11 +49,7 @@ pub fn process_message(

let origin_timestamp = DateTime::from_timestamp(from.received as i64, 0);

Ok(InsertBatch {
origin_timestamp,
rows: RowData::from_rows(metrics_summaries)?,
sentry_received_timestamp: None,
})
InsertBatch::from_rows(metrics_summaries, origin_timestamp)
}

#[derive(Debug, Default, Deserialize)]
Expand Down
13 changes: 13 additions & 0 deletions rust_snuba/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ define_processing_functions! {
("GenericGaugesMetricsProcessor", "snuba-generic-metrics", generic_metrics::process_gauge_message),
}

// COGS is recorded for these processors
pub fn get_cogs_label(processor_name: &str) -> Option<String> {
match processor_name {
"GenericCountersMetricsProcessor" => Some("generic_metrics_processor_counters".to_string()),
"GenericSetsMetricsProcessor" => Some("generic_metrics_processor_sets".to_string()),
"GenericDistributionsMetricsProcessor" => {
Some("generic_metrics_processor_distributions".to_string())
}
"GenericGaugesMetricsProcessor" => Some("generic_metrics_processor_gauges".to_string()),
_ => None,
}
}

#[cfg(test)]
mod tests {
use std::time::SystemTime;
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/src/processors/outcomes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn process_message(
}
}

InsertBatch::from_rows([msg])
InsertBatch::from_rows([msg], None)
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
8 changes: 2 additions & 6 deletions rust_snuba/src/processors/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::processors::utils::enforce_retention;
use crate::types::{InsertBatch, KafkaMessageMetadata, RowData};
use crate::types::{InsertBatch, KafkaMessageMetadata};

pub fn process_message(
payload: KafkaPayload,
Expand All @@ -22,11 +22,7 @@ pub fn process_message(

let origin_timestamp = DateTime::from_timestamp(msg.received, 0);

Ok(InsertBatch {
origin_timestamp,
rows: RowData::from_rows([msg])?,
sentry_received_timestamp: None,
})
InsertBatch::from_rows([msg], origin_timestamp)
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/src/processors/querylog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn process_message(
offset: metadata.offset,
};

InsertBatch::from_rows([querylog_msg])
InsertBatch::from_rows([querylog_msg], None)
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/src/processors/replays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub fn process_message(
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let replay_row = deserialize_message(payload_bytes, metadata.partition, metadata.offset)?;
InsertBatch::from_rows(replay_row)
InsertBatch::from_rows(replay_row, None)
}

pub fn deserialize_message(
Expand Down
Loading

0 comments on commit 8d35a12

Please sign in to comment.