From 6518d745b9a651320dcd55e08f562a5c49788a49 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 29 Jun 2023 13:00:33 +0200 Subject: [PATCH] ref: Decouple processing batches and clickhouse batches (#4251) Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com> --- snuba/cli/consumer.py | 26 ++++++++++++++++++++++-- snuba/cli/dlq_consumer.py | 16 +++++++++++++++ snuba/consumers/consumer_builder.py | 14 +++++++++++-- snuba/consumers/strategy_factory.py | 8 ++++++-- snuba/web/views.py | 2 ++ tests/consumers/test_consumer_builder.py | 4 ++++ tests/test_consumer.py | 2 ++ 7 files changed, 66 insertions(+), 6 deletions(-) diff --git a/snuba/cli/consumer.py b/snuba/cli/consumer.py index d6e242a268..3824c27875 100644 --- a/snuba/cli/consumer.py +++ b/snuba/cli/consumer.py @@ -71,13 +71,31 @@ "--max-batch-size", default=settings.DEFAULT_MAX_BATCH_SIZE, type=int, - help="Max number of messages to batch in memory before writing to Kafka.", + help=( + "Max number of messages to batch in memory.\n\n" + "Batching parameters apply to three steps: Batching of messages for " + "processing them (=transforming them into ClickHouse rows), batching for" + "the INSERT statement, and batching of offset commits.\n\n" + "Commits are additionally debounced to happen at most once per second." + ), ) @click.option( "--max-batch-time-ms", default=settings.DEFAULT_MAX_BATCH_TIME_MS, type=int, - help="Max length of time to buffer messages in memory before writing to Kafka.", + help="Max duration to buffer messages in memory for.", +) +@click.option( + "--max-insert-batch-size", + default=None, + type=int, + help="Max number of messages to batch in memory for inserts into ClickHouse. Defaults to --max-batch-size", +) +@click.option( + "--max-insert-batch-time-ms", + default=None, + type=int, + help="Max duration to batch in memory for inserts into ClickHouse. Defaults to --max-batch-time-ms", ) @click.option( "--auto-offset-reset", @@ -143,6 +161,8 @@ def consumer( slice_id: Optional[int], max_batch_size: int, max_batch_time_ms: int, + max_insert_batch_size: Optional[int], + max_insert_batch_time_ms: Optional[int], auto_offset_reset: str, no_strict_offset_reset: bool, queued_max_messages_kbytes: int, @@ -204,6 +224,8 @@ def consumer( ), max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, + max_insert_batch_size=max_insert_batch_size, + max_insert_batch_time_ms=max_insert_batch_time_ms, metrics=metrics, profile_path=profile_path, slice_id=slice_id, diff --git a/snuba/cli/dlq_consumer.py b/snuba/cli/dlq_consumer.py index c6c12c065c..612bfff8fd 100644 --- a/snuba/cli/dlq_consumer.py +++ b/snuba/cli/dlq_consumer.py @@ -46,6 +46,18 @@ type=int, help="Max length of time to buffer messages in memory before writing to Kafka.", ) +@click.option( + "--max-insert-batch-size", + default=None, + type=int, + help="Max number of messages to batch in memory for inserts into ClickHouse. Defaults to --max-batch-size", +) +@click.option( + "--max-insert-batch-time-ms", + default=None, + type=int, + help="Max duration to batch in memory for inserts into ClickHouse. Defaults to --max-batch-time-ms", +) @click.option( "--auto-offset-reset", default="error", @@ -87,6 +99,8 @@ def dlq_consumer( consumer_group: str, max_batch_size: int, max_batch_time_ms: int, + max_insert_batch_size: int, + max_insert_batch_time_ms: int, auto_offset_reset: str, no_strict_offset_reset: bool, queued_max_messages_kbytes: int, @@ -166,6 +180,8 @@ def handler(signum: int, frame: Any) -> None: ), max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, + max_insert_batch_size=max_insert_batch_size, + max_insert_batch_time_ms=max_insert_batch_time_ms, metrics=metrics, slice_id=instruction.slice_id, join_timeout=None, diff --git a/snuba/consumers/consumer_builder.py b/snuba/consumers/consumer_builder.py index d9edb6df16..23f74cde0b 100644 --- a/snuba/consumers/consumer_builder.py +++ b/snuba/consumers/consumer_builder.py @@ -10,7 +10,7 @@ build_kafka_configuration, build_kafka_consumer_configuration, ) -from arroyo.commit import IMMEDIATE +from arroyo.commit import ONCE_PER_SECOND from arroyo.dlq import DlqLimit, DlqPolicy, KafkaDlqProducer, NoopDlqProducer from arroyo.processing import StreamProcessor from arroyo.processing.strategies import ProcessingStrategyFactory @@ -68,6 +68,8 @@ def __init__( processing_params: ProcessingParameters, max_batch_size: int, max_batch_time_ms: int, + max_insert_batch_size: Optional[int], + max_insert_batch_time_ms: Optional[int], metrics: MetricsBackend, slice_id: Optional[int], join_timeout: Optional[float], @@ -132,6 +134,8 @@ def __init__( self.metrics = metrics self.max_batch_size = max_batch_size self.max_batch_time_ms = max_batch_time_ms + self.max_insert_batch_size = max_insert_batch_size + self.max_insert_batch_time_ms = max_insert_batch_time_ms self.group_id = kafka_params.group_id self.auto_offset_reset = kafka_params.auto_offset_reset self.strict_offset_reset = kafka_params.strict_offset_reset @@ -184,7 +188,7 @@ def log_general_error(e: KafkaError) -> None: consumer, input_topic, strategy_factory, - IMMEDIATE, + ONCE_PER_SECOND, dlq_policy=dlq_policy, join_timeout=self.join_timeout, ) @@ -227,6 +231,9 @@ def build_streaming_strategy_factory( ), max_batch_size=self.max_batch_size, max_batch_time=self.max_batch_time_ms / 1000.0, + max_insert_batch_size=self.max_insert_batch_size, + max_insert_batch_time=self.max_insert_batch_time_ms + and self.max_insert_batch_time_ms / 1000.0, processes=self.processes, input_block_size=self.input_block_size, output_block_size=self.output_block_size, @@ -280,6 +287,9 @@ def build_dlq_strategy_factory( ), max_batch_size=self.max_batch_size, max_batch_time=self.max_batch_time_ms / 1000.0, + max_insert_batch_size=self.max_insert_batch_size, + max_insert_batch_time=self.max_insert_batch_time_ms + and self.max_insert_batch_time_ms / 1000.0, processes=self.processes, input_block_size=self.input_block_size, output_block_size=self.output_block_size, diff --git a/snuba/consumers/strategy_factory.py b/snuba/consumers/strategy_factory.py index 913f04f3eb..df1f09c2a4 100644 --- a/snuba/consumers/strategy_factory.py +++ b/snuba/consumers/strategy_factory.py @@ -64,6 +64,8 @@ def __init__( processes: Optional[int], input_block_size: Optional[int], output_block_size: Optional[int], + max_insert_batch_size: Optional[int], + max_insert_batch_time: Optional[float], # Passed in the case of DLQ consumer which exits after a certain number of messages # is processed max_messages_to_process: Optional[int] = None, @@ -76,6 +78,8 @@ def __init__( self.__max_batch_size = max_batch_size self.__max_batch_time = max_batch_time + self.__max_insert_batch_size = max_insert_batch_size or max_batch_size + self.__max_insert_batch_time = max_insert_batch_time or max_batch_time if processes is not None: assert input_block_size is not None, "input block size required" @@ -119,8 +123,8 @@ def flush_batch( commit_strategy = CommitOffsets(commit) collect: Reduce[ProcessedMessage, ProcessedMessageBatchWriter] = Reduce( - self.__max_batch_size, - self.__max_batch_time, + self.__max_insert_batch_size, + self.__max_insert_batch_time, accumulator, self.__collector, RunTaskInThreads( diff --git a/snuba/web/views.py b/snuba/web/views.py index 7cbde9d50e..33de52f564 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -655,6 +655,8 @@ def commit( processes=None, input_block_size=None, output_block_size=None, + max_insert_batch_size=None, + max_insert_batch_time=None, ).create_with_partitions(commit, {}) strategy.submit(message) strategy.close() diff --git a/tests/consumers/test_consumer_builder.py b/tests/consumers/test_consumer_builder.py index c8d9c1d0cb..29e0312336 100644 --- a/tests/consumers/test_consumer_builder.py +++ b/tests/consumers/test_consumer_builder.py @@ -54,6 +54,8 @@ ), max_batch_size=3, max_batch_time_ms=4, + max_insert_batch_size=None, + max_insert_batch_time_ms=None, metrics=MetricsWrapper( environment.metrics, "test_consumer", @@ -98,6 +100,8 @@ ), max_batch_size=3, max_batch_time_ms=4, + max_insert_batch_size=None, + max_insert_batch_time_ms=None, metrics=MetricsWrapper( environment.metrics, "test_consumer", diff --git a/tests/test_consumer.py b/tests/test_consumer.py index dd7fa9e99f..4bf2dcd86e 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -80,6 +80,8 @@ def write_step() -> ProcessedMessageBatchWriter: write_step, max_batch_size=10, max_batch_time=60, + max_insert_batch_size=None, + max_insert_batch_time=None, processes=None, input_block_size=None, output_block_size=None,