From 5b3a54fa33dcf5662fd2232bc47751cafe674ebc Mon Sep 17 00:00:00 2001 From: Riya Chakraborty <47572810+ayirr7@users.noreply.github.com> Date: Tue, 25 Jun 2024 13:23:49 -0700 Subject: [PATCH] ref(consumer): Add batch write timeout to Rust consumer (#6043) * add new batch write timeout parameter * linting + add main CLI entry arg * linting * linitng again * fix linting again * add CLI arg to rust consumer entrypoint * add tests * add snapshots ? and add constraint for timeout * clean up timeout constraint * remove snapshots? * change test to should panic --- rust_snuba/benches/processors.rs | 1 + rust_snuba/src/consumer.rs | 15 ++++ rust_snuba/src/factory.rs | 2 + rust_snuba/src/strategies/clickhouse/batch.rs | 89 ++++++++++++++++++- snuba/cli/rust_consumer.py | 8 ++ 5 files changed, 111 insertions(+), 4 deletions(-) diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index 42e2d50192..41b84cb84e 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -97,6 +97,7 @@ fn create_factory( broker_config: BrokerConfig::default(), }, stop_at_timestamp: None, + batch_write_timeout: None, }; Box::new(factory) } diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 12858251bc..d4e5e4014c 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -41,6 +41,7 @@ pub fn consumer( python_max_queue_depth: Option, health_check_file: Option<&str>, stop_at_timestamp: Option, + batch_write_timeout_ms: Option, ) { py.allow_threads(|| { consumer_impl( @@ -57,6 +58,7 @@ pub fn consumer( python_max_queue_depth, health_check_file, stop_at_timestamp, + batch_write_timeout_ms, ) }); } @@ -76,6 +78,7 @@ pub fn consumer_impl( python_max_queue_depth: Option, health_check_file: Option<&str>, stop_at_timestamp: Option, + batch_write_timeout_ms: Option, ) -> usize { setup_logging(); @@ -83,6 +86,17 @@ pub fn consumer_impl( let max_batch_size = consumer_config.max_batch_size; let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms); + let batch_write_timeout = match batch_write_timeout_ms { + Some(timeout_ms) => { + if timeout_ms >= consumer_config.max_batch_time_ms { + Some(Duration::from_millis(timeout_ms)) + } else { + None + } + } + None => None, + }; + for storage in &consumer_config.storages { tracing::info!( "Storage: {}, ClickHouse Table Name: {}, Message Processor: {:?}, ClickHouse host: {}, ClickHouse port: {}, ClickHouse HTTP port: {}, ClickHouse database: {}", @@ -232,6 +246,7 @@ pub fn consumer_impl( physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), accountant_topic_config: consumer_config.accountant_topic, stop_at_timestamp, + batch_write_timeout, }; let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name); diff --git a/rust_snuba/src/factory.rs b/rust_snuba/src/factory.rs index 3159c5e6a2..dcaf49de74 100644 --- a/rust_snuba/src/factory.rs +++ b/rust_snuba/src/factory.rs @@ -56,6 +56,7 @@ pub struct ConsumerStrategyFactory { pub physical_topic_name: Topic, pub accountant_topic_config: config::TopicConfig, pub stop_at_timestamp: Option, + pub batch_write_timeout: Option, } impl ProcessingStrategyFactory for ConsumerStrategyFactory { @@ -118,6 +119,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactory { &self.storage_config.clickhouse_cluster.user, &self.storage_config.clickhouse_cluster.password, self.async_inserts, + self.batch_write_timeout, ); let accumulator = Arc::new( diff --git a/rust_snuba/src/strategies/clickhouse/batch.rs b/rust_snuba/src/strategies/clickhouse/batch.rs index 29275af6e5..ecb4a6ad92 100644 --- a/rust_snuba/src/strategies/clickhouse/batch.rs +++ b/rust_snuba/src/strategies/clickhouse/batch.rs @@ -42,6 +42,7 @@ impl BatchFactory { clickhouse_user: &str, clickhouse_password: &str, async_inserts: bool, + batch_write_timeout: Option, ) -> Self { let mut headers = HeaderMap::with_capacity(5); headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); @@ -72,10 +73,18 @@ impl BatchFactory { let url = format!("http://{hostname}:{http_port}?{query_params}"); let query = format!("INSERT INTO {table} FORMAT JSONEachRow"); - let client = ClientBuilder::new() - .default_headers(headers) - .build() - .unwrap(); + let client = if let Some(timeout_duration) = batch_write_timeout { + ClientBuilder::new() + .default_headers(headers) + .timeout(timeout_duration) + .build() + .unwrap() + } else { + ClientBuilder::new() + .default_headers(headers) + .build() + .unwrap() + }; BatchFactory { client, @@ -252,6 +261,7 @@ mod tests { "default", "", false, + None, ); let mut batch = factory.new_batch(); @@ -286,6 +296,7 @@ mod tests { "default", "", true, + None, ); let mut batch = factory.new_batch(); @@ -319,6 +330,7 @@ mod tests { "default", "", false, + None, ); let mut batch = factory.new_batch(); @@ -350,6 +362,7 @@ mod tests { "default", "", false, + None, ); let mut batch = factory.new_batch(); @@ -366,4 +379,72 @@ mod tests { // ensure there has not been any HTTP request mock.assert_hits(0); } + + #[test] + #[should_panic] + fn test_write_no_time() { + crate::testutils::initialize_python(); + let server = MockServer::start(); + + let concurrency = ConcurrencyConfig::new(1); + let factory = BatchFactory::new( + &server.host(), + server.port(), + "testtable", + "testdb", + &concurrency, + "default", + "", + true, + // pass in an unreasonably short timeout + // which prevents the client request from reaching Clickhouse + Some(Duration::from_millis(0)), + ); + + let mut batch = factory.new_batch(); + + batch + .write_rows(&RowData::from_encoded_rows(vec![ + br#"{"hello": "world"}"#.to_vec() + ])) + .unwrap(); + + concurrency.handle().block_on(batch.finish()).unwrap(); + } + + #[test] + fn test_write_enough_time() { + crate::testutils::initialize_python(); + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(POST).path("/").body("{\"hello\": \"world\"}\n"); + then.status(200).body("hi"); + }); + + let concurrency = ConcurrencyConfig::new(1); + let factory = BatchFactory::new( + &server.host(), + server.port(), + "testtable", + "testdb", + &concurrency, + "default", + "", + true, + // pass in a reasonable timeout + Some(Duration::from_millis(1000)), + ); + + let mut batch = factory.new_batch(); + + batch + .write_rows(&RowData::from_encoded_rows(vec![ + br#"{"hello": "world"}"#.to_vec() + ])) + .unwrap(); + + concurrency.handle().block_on(batch.finish()).unwrap(); + + mock.assert(); + } } diff --git a/snuba/cli/rust_consumer.py b/snuba/cli/rust_consumer.py index 0bfe4c1513..cdfbaede4a 100644 --- a/snuba/cli/rust_consumer.py +++ b/snuba/cli/rust_consumer.py @@ -158,6 +158,12 @@ type=int, help="Unix timestamp after which to stop processing messages", ) +@click.option( + "--batch-write-timeout-ms", + type=int, + default=None, + help="Optional timeout for batch writer client connecting and sending request to Clickhouse", +) def rust_consumer( *, storage_names: Sequence[str], @@ -186,6 +192,7 @@ def rust_consumer( health_check_file: Optional[str], enforce_schema: bool, stop_at_timestamp: Optional[int], + batch_write_timeout_ms: Optional[int] ) -> None: """ Experimental alternative to `snuba consumer` @@ -235,6 +242,7 @@ def rust_consumer( python_max_queue_depth, health_check_file, stop_at_timestamp, + batch_write_timeout_ms, ) sys.exit(exitcode)