diff --git a/requirements.txt b/requirements.txt index e3226d7262..1fc85a7ea9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,7 +23,7 @@ python-dateutil==2.8.2 python-rapidjson==1.8 pytz==2022.2.1 redis==4.3.4 -sentry-arroyo==2.13.0 +sentry-arroyo==2.14.0 sentry-kafka-schemas==0.1.14 sentry-redis-tools==0.1.6 sentry-relay==0.8.21 diff --git a/snuba/consumers/consumer.py b/snuba/consumers/consumer.py index f8215721e2..b67828b858 100644 --- a/snuba/consumers/consumer.py +++ b/snuba/consumers/consumer.py @@ -32,12 +32,12 @@ from arroyo.processing.strategies import ( CommitOffsets, FilterStep, - ParallelTransformStep, ProcessingStrategy, ProcessingStrategyFactory, Reduce, + RunTask, RunTaskInThreads, - TransformStep, + RunTaskWithMultiprocessing, ) from arroyo.types import ( BaseValue, @@ -835,11 +835,11 @@ def flush_batch( ] if self.__processes is None: - inner_strategy = TransformStep(transform_function, collect) + inner_strategy = RunTask(transform_function, collect) else: assert self.__input_block_size is not None assert self.__output_block_size is not None - inner_strategy = ParallelTransformStep( + inner_strategy = RunTaskWithMultiprocessing( transform_function, collect, self.__processes, @@ -850,7 +850,7 @@ def flush_batch( initializer=self.__initialize_parallel_transform, ) - return TransformStep( + return RunTask( partial(find_destination_storages, self.__storages), FilterStep( has_destination_storages, diff --git a/snuba/consumers/strategy_factory.py b/snuba/consumers/strategy_factory.py index 4886080edb..913f04f3eb 100644 --- a/snuba/consumers/strategy_factory.py +++ b/snuba/consumers/strategy_factory.py @@ -8,10 +8,11 @@ ProcessingStrategy, ProcessingStrategyFactory, Reduce, + RunTask, RunTaskInThreads, + RunTaskWithMultiprocessing, ) from arroyo.processing.strategies.commit import CommitOffsets -from arroyo.processing.strategies.transform import ParallelTransformStep, TransformStep from arroyo.types import BaseValue, Commit, FilteredPayload, Message, Partition from snuba.consumers.consumer import BytesInsertBatch, ProcessedMessageBatchWriter @@ -136,11 +137,11 @@ def flush_batch( strategy: ProcessingStrategy[Union[FilteredPayload, KafkaPayload]] if self.__processes is None: - strategy = TransformStep(transform_function, collect) + strategy = RunTask(transform_function, collect) else: assert self.__input_block_size is not None assert self.__output_block_size is not None - strategy = ParallelTransformStep( + strategy = RunTaskWithMultiprocessing( transform_function, collect, self.__processes,