Skip to content

Commit

Permalink
ref(consumer): Add batch write timeout to Rust consumer (#6043)
Browse files Browse the repository at this point in the history
* add new batch write timeout parameter

* linting + add main CLI entry arg

* linting

* linitng again

* fix linting again

* add CLI arg to rust consumer entrypoint

* add tests

* add snapshots ? and add constraint for timeout

* clean up timeout constraint

* remove snapshots?

* change test to should panic
  • Loading branch information
ayirr7 committed Jun 25, 2024
1 parent 0bb40d8 commit 5b3a54f
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 4 deletions.
1 change: 1 addition & 0 deletions rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ fn create_factory(
broker_config: BrokerConfig::default(),
},
stop_at_timestamp: None,
batch_write_timeout: None,
};
Box::new(factory)
}
Expand Down
15 changes: 15 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub fn consumer(
python_max_queue_depth: Option<usize>,
health_check_file: Option<&str>,
stop_at_timestamp: Option<i64>,
batch_write_timeout_ms: Option<u64>,
) {
py.allow_threads(|| {
consumer_impl(
Expand All @@ -57,6 +58,7 @@ pub fn consumer(
python_max_queue_depth,
health_check_file,
stop_at_timestamp,
batch_write_timeout_ms,
)
});
}
Expand All @@ -76,13 +78,25 @@ pub fn consumer_impl(
python_max_queue_depth: Option<usize>,
health_check_file: Option<&str>,
stop_at_timestamp: Option<i64>,
batch_write_timeout_ms: Option<u64>,
) -> usize {
setup_logging();

let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap();
let max_batch_size = consumer_config.max_batch_size;
let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms);

let batch_write_timeout = match batch_write_timeout_ms {
Some(timeout_ms) => {
if timeout_ms >= consumer_config.max_batch_time_ms {
Some(Duration::from_millis(timeout_ms))
} else {
None
}
}
None => None,
};

for storage in &consumer_config.storages {
tracing::info!(
"Storage: {}, ClickHouse Table Name: {}, Message Processor: {:?}, ClickHouse host: {}, ClickHouse port: {}, ClickHouse HTTP port: {}, ClickHouse database: {}",
Expand Down Expand Up @@ -232,6 +246,7 @@ pub fn consumer_impl(
physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name),
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
};

let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name);
Expand Down
2 changes: 2 additions & 0 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct ConsumerStrategyFactory {
pub physical_topic_name: Topic,
pub accountant_topic_config: config::TopicConfig,
pub stop_at_timestamp: Option<i64>,
pub batch_write_timeout: Option<Duration>,
}

impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
Expand Down Expand Up @@ -118,6 +119,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
&self.storage_config.clickhouse_cluster.user,
&self.storage_config.clickhouse_cluster.password,
self.async_inserts,
self.batch_write_timeout,
);

let accumulator = Arc::new(
Expand Down
89 changes: 85 additions & 4 deletions rust_snuba/src/strategies/clickhouse/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl BatchFactory {
clickhouse_user: &str,
clickhouse_password: &str,
async_inserts: bool,
batch_write_timeout: Option<Duration>,
) -> Self {
let mut headers = HeaderMap::with_capacity(5);
headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
Expand Down Expand Up @@ -72,10 +73,18 @@ impl BatchFactory {
let url = format!("http://{hostname}:{http_port}?{query_params}");
let query = format!("INSERT INTO {table} FORMAT JSONEachRow");

let client = ClientBuilder::new()
.default_headers(headers)
.build()
.unwrap();
let client = if let Some(timeout_duration) = batch_write_timeout {
ClientBuilder::new()
.default_headers(headers)
.timeout(timeout_duration)
.build()
.unwrap()
} else {
ClientBuilder::new()
.default_headers(headers)
.build()
.unwrap()
};

BatchFactory {
client,
Expand Down Expand Up @@ -252,6 +261,7 @@ mod tests {
"default",
"",
false,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -286,6 +296,7 @@ mod tests {
"default",
"",
true,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -319,6 +330,7 @@ mod tests {
"default",
"",
false,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -350,6 +362,7 @@ mod tests {
"default",
"",
false,
None,
);

let mut batch = factory.new_batch();
Expand All @@ -366,4 +379,72 @@ mod tests {
// ensure there has not been any HTTP request
mock.assert_hits(0);
}

#[test]
#[should_panic]
fn test_write_no_time() {
crate::testutils::initialize_python();
let server = MockServer::start();

let concurrency = ConcurrencyConfig::new(1);
let factory = BatchFactory::new(
&server.host(),
server.port(),
"testtable",
"testdb",
&concurrency,
"default",
"",
true,
// pass in an unreasonably short timeout
// which prevents the client request from reaching Clickhouse
Some(Duration::from_millis(0)),
);

let mut batch = factory.new_batch();

batch
.write_rows(&RowData::from_encoded_rows(vec![
br#"{"hello": "world"}"#.to_vec()
]))
.unwrap();

concurrency.handle().block_on(batch.finish()).unwrap();
}

#[test]
fn test_write_enough_time() {
crate::testutils::initialize_python();
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/").body("{\"hello\": \"world\"}\n");
then.status(200).body("hi");
});

let concurrency = ConcurrencyConfig::new(1);
let factory = BatchFactory::new(
&server.host(),
server.port(),
"testtable",
"testdb",
&concurrency,
"default",
"",
true,
// pass in a reasonable timeout
Some(Duration::from_millis(1000)),
);

let mut batch = factory.new_batch();

batch
.write_rows(&RowData::from_encoded_rows(vec![
br#"{"hello": "world"}"#.to_vec()
]))
.unwrap();

concurrency.handle().block_on(batch.finish()).unwrap();

mock.assert();
}
}
8 changes: 8 additions & 0 deletions snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@
type=int,
help="Unix timestamp after which to stop processing messages",
)
@click.option(
"--batch-write-timeout-ms",
type=int,
default=None,
help="Optional timeout for batch writer client connecting and sending request to Clickhouse",
)
def rust_consumer(
*,
storage_names: Sequence[str],
Expand Down Expand Up @@ -186,6 +192,7 @@ def rust_consumer(
health_check_file: Optional[str],
enforce_schema: bool,
stop_at_timestamp: Optional[int],
batch_write_timeout_ms: Optional[int]
) -> None:
"""
Experimental alternative to `snuba consumer`
Expand Down Expand Up @@ -235,6 +242,7 @@ def rust_consumer(
python_max_queue_depth,
health_check_file,
stop_at_timestamp,
batch_write_timeout_ms,
)

sys.exit(exitcode)

0 comments on commit 5b3a54f

Please sign in to comment.