From 6afc4072b962dccc1456df69782b25ace7681f28 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 7 Dec 2023 16:12:01 -0800 Subject: [PATCH] feat: Multiprocessing experiment (#5177) Try https://github.com/getsentry/snuba/pull/5170 again with a high join timeout. This PR is a short term test that should be reverted. We want to check if there's any correlation between join timeout and the consumer issues we saw with arroyo 2.15.0 earlier. --- requirements.txt | 2 +- snuba/cli/consumer.py | 3 +-- snuba/consumers/consumer.py | 18 ++++++++++++++---- snuba/consumers/strategy_factory.py | 17 ++++++++++++++--- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/requirements.txt b/requirements.txt index e527278460..8b88e03c2b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,7 +22,7 @@ pytest-watch==4.2.0 python-dateutil==2.8.2 python-rapidjson==1.8 redis==4.3.4 -sentry-arroyo==2.14.25 +sentry-arroyo==2.15.0 sentry-kafka-schemas==0.1.37 sentry-redis-tools==0.1.7 sentry-relay==0.8.27 diff --git a/snuba/cli/consumer.py b/snuba/cli/consumer.py index 1cc8e96f58..bb3ee28c43 100644 --- a/snuba/cli/consumer.py +++ b/snuba/cli/consumer.py @@ -136,7 +136,7 @@ "--output-block-size", type=int, ) -@click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=5) +@click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=20) @click.option( "--enforce-schema", type=bool, @@ -201,7 +201,6 @@ def consumer( group_instance_id: Optional[str], skip_write: bool, ) -> None: - setup_logging(log_level) setup_sentry() diff --git a/snuba/consumers/consumer.py b/snuba/consumers/consumer.py index 9ecf400e2d..34b806b9b6 100644 --- a/snuba/consumers/consumer.py +++ b/snuba/consumers/consumer.py @@ -39,6 +39,9 @@ RunTaskInThreads, RunTaskWithMultiprocessing, ) +from arroyo.processing.strategies.run_task_with_multiprocessing import ( + MultiprocessingPool, +) from arroyo.types import ( BaseValue, BrokerValue, @@ -389,7 +392,6 @@ def build_batch_writer( commit_log_config: Optional[CommitLogConfig] = None, slice_id: Optional[int] = None, ) -> Callable[[], ProcessedMessageBatchWriter]: - assert not (replacements_producer is None) ^ (replacements_topic is None) supports_replacements = replacements_producer is not None @@ -800,6 +802,11 @@ def __init__( replacements, slice_id, ) + self.__pool = ( + MultiprocessingPool(self.__processes, initialize_parallel_transform) + if self.__processes is not None + else None + ) def create_with_partitions( self, @@ -833,18 +840,17 @@ def flush_batch( Union[FilteredPayload, MultistorageKafkaPayload] ] - if self.__processes is None: + if self.__pool is None: inner_strategy = RunTask(transform_function, collect) else: inner_strategy = RunTaskWithMultiprocessing( transform_function, collect, - self.__processes, max_batch_size=self.__max_batch_size, max_batch_time=self.__max_batch_time, + pool=self.__pool, input_block_size=self.__input_block_size, output_block_size=self.__output_block_size, - initializer=self.__initialize_parallel_transform, ) return RunTask( @@ -854,3 +860,7 @@ def flush_batch( inner_strategy, ), ) + + def shutdown(self) -> None: + if self.__pool: + self.__pool.close() diff --git a/snuba/consumers/strategy_factory.py b/snuba/consumers/strategy_factory.py index 2a381c6160..083f10baa4 100644 --- a/snuba/consumers/strategy_factory.py +++ b/snuba/consumers/strategy_factory.py @@ -14,6 +14,9 @@ ) from arroyo.processing.strategies.commit import CommitOffsets from arroyo.processing.strategies.healthcheck import Healthcheck +from arroyo.processing.strategies.run_task_with_multiprocessing import ( + MultiprocessingPool, +) from arroyo.types import BaseValue, Commit, FilteredPayload, Message, Partition from snuba.consumers.consumer import BytesInsertBatch, ProcessedMessageBatchWriter @@ -98,6 +101,11 @@ def __init__( self.__output_block_size = output_block_size self.__initialize_parallel_transform = initialize_parallel_transform self.__health_check_file = health_check_file + self.__pool = ( + MultiprocessingPool(self.__processes, self.__initialize_parallel_transform) + if self.__processes + else None + ) def __should_accept(self, message: Message[KafkaPayload]) -> bool: assert self.__prefilter is not None @@ -145,18 +153,17 @@ def flush_batch( transform_function = self.__process_message strategy: ProcessingStrategy[Union[FilteredPayload, KafkaPayload]] - if self.__processes is None: + if self.__pool is None: strategy = RunTask(transform_function, collect) else: strategy = RunTaskWithMultiprocessing( transform_function, collect, - self.__processes, max_batch_size=self.__max_batch_size, max_batch_time=self.__max_batch_time, + pool=self.__pool, input_block_size=self.__input_block_size, output_block_size=self.__output_block_size, - initializer=self.__initialize_parallel_transform, ) if self.__prefilter is not None: @@ -173,3 +180,7 @@ def flush_batch( strategy = Healthcheck(self.__health_check_file, strategy) return strategy + + def shutdown(self) -> None: + if self.__pool: + self.__pool.close()