diff --git a/snuba/cli/dlq_consumer.py b/snuba/cli/dlq_consumer.py index c6c12c065c..612bfff8fd 100644 --- a/snuba/cli/dlq_consumer.py +++ b/snuba/cli/dlq_consumer.py @@ -46,6 +46,18 @@ type=int, help="Max length of time to buffer messages in memory before writing to Kafka.", ) +@click.option( + "--max-insert-batch-size", + default=None, + type=int, + help="Max number of messages to batch in memory for inserts into ClickHouse. Defaults to --max-batch-size", +) +@click.option( + "--max-insert-batch-time-ms", + default=None, + type=int, + help="Max duration to batch in memory for inserts into ClickHouse. Defaults to --max-batch-time-ms", +) @click.option( "--auto-offset-reset", default="error", @@ -87,6 +99,8 @@ def dlq_consumer( consumer_group: str, max_batch_size: int, max_batch_time_ms: int, + max_insert_batch_size: int, + max_insert_batch_time_ms: int, auto_offset_reset: str, no_strict_offset_reset: bool, queued_max_messages_kbytes: int, @@ -166,6 +180,8 @@ def handler(signum: int, frame: Any) -> None: ), max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, + max_insert_batch_size=max_insert_batch_size, + max_insert_batch_time_ms=max_insert_batch_time_ms, metrics=metrics, slice_id=instruction.slice_id, join_timeout=None,