Skip to content

Commit

Permalink
add args to dlq consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker committed Jun 29, 2023
1 parent 1a60140 commit 283b52f
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions snuba/cli/dlq_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@
type=int,
help="Max length of time to buffer messages in memory before writing to Kafka.",
)
@click.option(
"--max-insert-batch-size",
default=None,
type=int,
help="Max number of messages to batch in memory for inserts into ClickHouse. Defaults to --max-batch-size",
)
@click.option(
"--max-insert-batch-time-ms",
default=None,
type=int,
help="Max duration to batch in memory for inserts into ClickHouse. Defaults to --max-batch-time-ms",
)
@click.option(
"--auto-offset-reset",
default="error",
Expand Down Expand Up @@ -87,6 +99,8 @@ def dlq_consumer(
consumer_group: str,
max_batch_size: int,
max_batch_time_ms: int,
max_insert_batch_size: int,
max_insert_batch_time_ms: int,
auto_offset_reset: str,
no_strict_offset_reset: bool,
queued_max_messages_kbytes: int,
Expand Down Expand Up @@ -166,6 +180,8 @@ def handler(signum: int, frame: Any) -> None:
),
max_batch_size=max_batch_size,
max_batch_time_ms=max_batch_time_ms,
max_insert_batch_size=max_insert_batch_size,
max_insert_batch_time_ms=max_insert_batch_time_ms,
metrics=metrics,
slice_id=instruction.slice_id,
join_timeout=None,
Expand Down

0 comments on commit 283b52f

Please sign in to comment.