Skip to content

Commit

Permalink
ref: Decouple processing batches and clickhouse batches (#4251)
Browse files Browse the repository at this point in the history
Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
  • Loading branch information
untitaker and getsantry[bot] authored Jun 29, 2023
1 parent ce7bc50 commit 6518d74
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 6 deletions.
26 changes: 24 additions & 2 deletions snuba/cli/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,31 @@
"--max-batch-size",
default=settings.DEFAULT_MAX_BATCH_SIZE,
type=int,
help="Max number of messages to batch in memory before writing to Kafka.",
help=(
"Max number of messages to batch in memory.\n\n"
"Batching parameters apply to three steps: Batching of messages for "
"processing them (=transforming them into ClickHouse rows), batching for"
"the INSERT statement, and batching of offset commits.\n\n"
"Commits are additionally debounced to happen at most once per second."
),
)
@click.option(
"--max-batch-time-ms",
default=settings.DEFAULT_MAX_BATCH_TIME_MS,
type=int,
help="Max length of time to buffer messages in memory before writing to Kafka.",
help="Max duration to buffer messages in memory for.",
)
@click.option(
"--max-insert-batch-size",
default=None,
type=int,
help="Max number of messages to batch in memory for inserts into ClickHouse. Defaults to --max-batch-size",
)
@click.option(
"--max-insert-batch-time-ms",
default=None,
type=int,
help="Max duration to batch in memory for inserts into ClickHouse. Defaults to --max-batch-time-ms",
)
@click.option(
"--auto-offset-reset",
Expand Down Expand Up @@ -143,6 +161,8 @@ def consumer(
slice_id: Optional[int],
max_batch_size: int,
max_batch_time_ms: int,
max_insert_batch_size: Optional[int],
max_insert_batch_time_ms: Optional[int],
auto_offset_reset: str,
no_strict_offset_reset: bool,
queued_max_messages_kbytes: int,
Expand Down Expand Up @@ -204,6 +224,8 @@ def consumer(
),
max_batch_size=max_batch_size,
max_batch_time_ms=max_batch_time_ms,
max_insert_batch_size=max_insert_batch_size,
max_insert_batch_time_ms=max_insert_batch_time_ms,
metrics=metrics,
profile_path=profile_path,
slice_id=slice_id,
Expand Down
16 changes: 16 additions & 0 deletions snuba/cli/dlq_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@
type=int,
help="Max length of time to buffer messages in memory before writing to Kafka.",
)
@click.option(
"--max-insert-batch-size",
default=None,
type=int,
help="Max number of messages to batch in memory for inserts into ClickHouse. Defaults to --max-batch-size",
)
@click.option(
"--max-insert-batch-time-ms",
default=None,
type=int,
help="Max duration to batch in memory for inserts into ClickHouse. Defaults to --max-batch-time-ms",
)
@click.option(
"--auto-offset-reset",
default="error",
Expand Down Expand Up @@ -87,6 +99,8 @@ def dlq_consumer(
consumer_group: str,
max_batch_size: int,
max_batch_time_ms: int,
max_insert_batch_size: int,
max_insert_batch_time_ms: int,
auto_offset_reset: str,
no_strict_offset_reset: bool,
queued_max_messages_kbytes: int,
Expand Down Expand Up @@ -166,6 +180,8 @@ def handler(signum: int, frame: Any) -> None:
),
max_batch_size=max_batch_size,
max_batch_time_ms=max_batch_time_ms,
max_insert_batch_size=max_insert_batch_size,
max_insert_batch_time_ms=max_insert_batch_time_ms,
metrics=metrics,
slice_id=instruction.slice_id,
join_timeout=None,
Expand Down
14 changes: 12 additions & 2 deletions snuba/consumers/consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
build_kafka_configuration,
build_kafka_consumer_configuration,
)
from arroyo.commit import IMMEDIATE
from arroyo.commit import ONCE_PER_SECOND
from arroyo.dlq import DlqLimit, DlqPolicy, KafkaDlqProducer, NoopDlqProducer
from arroyo.processing import StreamProcessor
from arroyo.processing.strategies import ProcessingStrategyFactory
Expand Down Expand Up @@ -68,6 +68,8 @@ def __init__(
processing_params: ProcessingParameters,
max_batch_size: int,
max_batch_time_ms: int,
max_insert_batch_size: Optional[int],
max_insert_batch_time_ms: Optional[int],
metrics: MetricsBackend,
slice_id: Optional[int],
join_timeout: Optional[float],
Expand Down Expand Up @@ -132,6 +134,8 @@ def __init__(
self.metrics = metrics
self.max_batch_size = max_batch_size
self.max_batch_time_ms = max_batch_time_ms
self.max_insert_batch_size = max_insert_batch_size
self.max_insert_batch_time_ms = max_insert_batch_time_ms
self.group_id = kafka_params.group_id
self.auto_offset_reset = kafka_params.auto_offset_reset
self.strict_offset_reset = kafka_params.strict_offset_reset
Expand Down Expand Up @@ -184,7 +188,7 @@ def log_general_error(e: KafkaError) -> None:
consumer,
input_topic,
strategy_factory,
IMMEDIATE,
ONCE_PER_SECOND,
dlq_policy=dlq_policy,
join_timeout=self.join_timeout,
)
Expand Down Expand Up @@ -227,6 +231,9 @@ def build_streaming_strategy_factory(
),
max_batch_size=self.max_batch_size,
max_batch_time=self.max_batch_time_ms / 1000.0,
max_insert_batch_size=self.max_insert_batch_size,
max_insert_batch_time=self.max_insert_batch_time_ms
and self.max_insert_batch_time_ms / 1000.0,
processes=self.processes,
input_block_size=self.input_block_size,
output_block_size=self.output_block_size,
Expand Down Expand Up @@ -280,6 +287,9 @@ def build_dlq_strategy_factory(
),
max_batch_size=self.max_batch_size,
max_batch_time=self.max_batch_time_ms / 1000.0,
max_insert_batch_size=self.max_insert_batch_size,
max_insert_batch_time=self.max_insert_batch_time_ms
and self.max_insert_batch_time_ms / 1000.0,
processes=self.processes,
input_block_size=self.input_block_size,
output_block_size=self.output_block_size,
Expand Down
8 changes: 6 additions & 2 deletions snuba/consumers/strategy_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def __init__(
processes: Optional[int],
input_block_size: Optional[int],
output_block_size: Optional[int],
max_insert_batch_size: Optional[int],
max_insert_batch_time: Optional[float],
# Passed in the case of DLQ consumer which exits after a certain number of messages
# is processed
max_messages_to_process: Optional[int] = None,
Expand All @@ -76,6 +78,8 @@ def __init__(

self.__max_batch_size = max_batch_size
self.__max_batch_time = max_batch_time
self.__max_insert_batch_size = max_insert_batch_size or max_batch_size
self.__max_insert_batch_time = max_insert_batch_time or max_batch_time

if processes is not None:
assert input_block_size is not None, "input block size required"
Expand Down Expand Up @@ -119,8 +123,8 @@ def flush_batch(
commit_strategy = CommitOffsets(commit)

collect: Reduce[ProcessedMessage, ProcessedMessageBatchWriter] = Reduce(
self.__max_batch_size,
self.__max_batch_time,
self.__max_insert_batch_size,
self.__max_insert_batch_time,
accumulator,
self.__collector,
RunTaskInThreads(
Expand Down
2 changes: 2 additions & 0 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,8 @@ def commit(
processes=None,
input_block_size=None,
output_block_size=None,
max_insert_batch_size=None,
max_insert_batch_time=None,
).create_with_partitions(commit, {})
strategy.submit(message)
strategy.close()
Expand Down
4 changes: 4 additions & 0 deletions tests/consumers/test_consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
),
max_batch_size=3,
max_batch_time_ms=4,
max_insert_batch_size=None,
max_insert_batch_time_ms=None,
metrics=MetricsWrapper(
environment.metrics,
"test_consumer",
Expand Down Expand Up @@ -98,6 +100,8 @@
),
max_batch_size=3,
max_batch_time_ms=4,
max_insert_batch_size=None,
max_insert_batch_time_ms=None,
metrics=MetricsWrapper(
environment.metrics,
"test_consumer",
Expand Down
2 changes: 2 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def write_step() -> ProcessedMessageBatchWriter:
write_step,
max_batch_size=10,
max_batch_time=60,
max_insert_batch_size=None,
max_insert_batch_time=None,
processes=None,
input_block_size=None,
output_block_size=None,
Expand Down

0 comments on commit 6518d74

Please sign in to comment.