Skip to content

Commit

Permalink
ref: Move to streaming writes in Rust consumer (#5694)
Browse files Browse the repository at this point in the history

Refactor the rust consumer so that from within Reduce, the HTTP request is
opened and rows are directly written into the socket.

This requires splitting up the BytesInsertBatch type as it can be in multiple
"states" depending on the stage of the pipeline. It contains a HttpBatch before
writing to clickhouse, and contains none before committing offsets.

The easiest way that I found is to first make it generic over its internal row
data R, then slowly fixing up all the instances where the generic was missing.

In a future PR I would like to see:

    deleting InsertBatch entirely, and replacing all instances with BytesInsertBatch
    renaming BytesInsertBatch to InsertBatch
    splitting up types.rs into multiple files
  • Loading branch information
untitaker authored Mar 27, 2024
1 parent 6281a9a commit 1406965
Show file tree
Hide file tree
Showing 12 changed files with 1,110 additions and 196 deletions.
756 changes: 744 additions & 12 deletions rust_snuba/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ rdkafka = { git = "https://github.com/fede1024/rust-rdkafka" }

[dev-dependencies]
criterion = "0.5.1"
httpmock = "0.7.0"
insta = { version = "1.34.0", features = ["json", "redactions"] }
once_cell = "1.18.0"
procspawn = { version = "1.0.0", features = ["test-support", "json"] }
Expand Down
49 changes: 35 additions & 14 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ use crate::config;
use crate::metrics::global_tags::set_global_tag;
use crate::processors::{self, get_cogs_label};
use crate::strategies::accountant::RecordCogs;
use crate::strategies::clickhouse::batch::{BatchFactory, HttpBatch};
use crate::strategies::clickhouse::ClickhouseWriterStep;
use crate::strategies::commit_log::ProduceCommitLog;
use crate::strategies::processor::{
get_schema, make_rust_processor, make_rust_processor_with_replacements, validate_schema,
};
use crate::strategies::python::PythonTransformStep;
use crate::strategies::replacements::ProduceReplacements;
use crate::types::BytesInsertBatch;
use crate::types::{BytesInsertBatch, CogsData, RowData};

pub struct ConsumerStrategyFactory {
storage_config: config::StorageConfig,
Expand Down Expand Up @@ -115,7 +116,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
let next_step = CommitOffsets::new(chrono::Duration::seconds(1));

// Produce commit log if there is one
let next_step: Box<dyn ProcessingStrategy<_>> =
let next_step: Box<dyn ProcessingStrategy<BytesInsertBatch<()>>> =
if let Some((ref producer, destination)) = self.commit_log_producer {
Box::new(ProduceCommitLog::new(
next_step,
Expand All @@ -130,19 +131,10 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
Box::new(next_step)
};

// Write to clickhouse
let next_step = Box::new(ClickhouseWriterStep::new(
next_step,
self.storage_config.clickhouse_cluster.clone(),
self.storage_config.clickhouse_table_name.clone(),
self.skip_write,
&self.clickhouse_concurrency,
));

let cogs_label = get_cogs_label(&self.storage_config.message_processor.python_class_name);

// Produce cogs if generic metrics AND we are not skipping writes AND record_cogs is true
let next_step: Box<dyn ProcessingStrategy<BytesInsertBatch>> =
let next_step: Box<dyn ProcessingStrategy<BytesInsertBatch<()>>> =
match (self.skip_write, self.env_config.record_cogs, cogs_label) {
(false, true, Some(resource_id)) => Box::new(RecordCogs::new(
next_step,
Expand All @@ -153,12 +145,41 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
_ => next_step,
};

// Write to clickhouse
let next_step = Box::new(ClickhouseWriterStep::new(
next_step,
&self.clickhouse_concurrency,
));

// Batch insert rows
let accumulator = Arc::new(BytesInsertBatch::merge);
let batch_factory = BatchFactory::new(
&self.storage_config.clickhouse_cluster.host,
self.storage_config.clickhouse_cluster.http_port,
&self.storage_config.clickhouse_table_name,
&self.storage_config.clickhouse_cluster.database,
&self.clickhouse_concurrency,
self.skip_write,
);

let accumulator = Arc::new(
|batch: BytesInsertBatch<HttpBatch>, small_batch: BytesInsertBatch<RowData>| {
batch.merge(small_batch)
},
);

let next_step = Reduce::new(
next_step,
accumulator,
BytesInsertBatch::default(),
Arc::new(move || {
BytesInsertBatch::new(
batch_factory.new_batch(),
None,
None,
None,
Default::default(),
CogsData::default(),
)
}),
self.max_batch_size,
self.max_batch_time,
BytesInsertBatch::len,
Expand Down
10 changes: 5 additions & 5 deletions rust_snuba/src/strategies/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<N> RecordCogs<N> {
topic_name: &str,
) -> Self
where
N: ProcessingStrategy<BytesInsertBatch> + 'static,
N: ProcessingStrategy<BytesInsertBatch<()>> + 'static,
{
let accountant = CogsAccountant::new(broker_config, topic_name);

Expand All @@ -63,18 +63,18 @@ impl<N> RecordCogs<N> {
}
}

impl<N> ProcessingStrategy<BytesInsertBatch> for RecordCogs<N>
impl<N> ProcessingStrategy<BytesInsertBatch<()>> for RecordCogs<N>
where
N: ProcessingStrategy<BytesInsertBatch> + 'static,
N: ProcessingStrategy<BytesInsertBatch<()>> + 'static,
{
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
self.next_step.poll()
}

fn submit(
&mut self,
message: Message<BytesInsertBatch>,
) -> Result<(), SubmitError<BytesInsertBatch>> {
message: Message<BytesInsertBatch<()>>,
) -> Result<(), SubmitError<BytesInsertBatch<()>>> {
for (app_feature, amount_bytes) in message.payload().cogs_data().data.iter() {
self.accountant
.record_bytes(&self.resource_id, app_feature, *amount_bytes)
Expand Down
Loading

0 comments on commit 1406965

Please sign in to comment.