From 283b52f342b45a8fbb9829375835bffe3d6ce12f Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 29 Jun 2023 06:07:28 +0200 Subject: [PATCH] add args to dlq consumer --- snuba/cli/dlq_consumer.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/snuba/cli/dlq_consumer.py b/snuba/cli/dlq_consumer.py index c6c12c065c..612bfff8fd 100644 --- a/snuba/cli/dlq_consumer.py +++ b/snuba/cli/dlq_consumer.py @@ -46,6 +46,18 @@ type=int, help="Max length of time to buffer messages in memory before writing to Kafka.", ) +@click.option( + "--max-insert-batch-size", + default=None, + type=int, + help="Max number of messages to batch in memory for inserts into ClickHouse. Defaults to --max-batch-size", +) +@click.option( + "--max-insert-batch-time-ms", + default=None, + type=int, + help="Max duration to batch in memory for inserts into ClickHouse. Defaults to --max-batch-time-ms", +) @click.option( "--auto-offset-reset", default="error", @@ -87,6 +99,8 @@ def dlq_consumer( consumer_group: str, max_batch_size: int, max_batch_time_ms: int, + max_insert_batch_size: int, + max_insert_batch_time_ms: int, auto_offset_reset: str, no_strict_offset_reset: bool, queued_max_messages_kbytes: int, @@ -166,6 +180,8 @@ def handler(signum: int, frame: Any) -> None: ), max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, + max_insert_batch_size=max_insert_batch_size, + max_insert_batch_time_ms=max_insert_batch_time_ms, metrics=metrics, slice_id=instruction.slice_id, join_timeout=None,