diff --git a/requirements.txt b/requirements.txt index 8b88e03c2b..e527278460 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,7 +22,7 @@ pytest-watch==4.2.0 python-dateutil==2.8.2 python-rapidjson==1.8 redis==4.3.4 -sentry-arroyo==2.15.0 +sentry-arroyo==2.14.25 sentry-kafka-schemas==0.1.37 sentry-redis-tools==0.1.7 sentry-relay==0.8.27 diff --git a/snuba/cli/consumer.py b/snuba/cli/consumer.py index bb3ee28c43..1cc8e96f58 100644 --- a/snuba/cli/consumer.py +++ b/snuba/cli/consumer.py @@ -136,7 +136,7 @@ "--output-block-size", type=int, ) -@click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=20) +@click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=5) @click.option( "--enforce-schema", type=bool, @@ -201,6 +201,7 @@ def consumer( group_instance_id: Optional[str], skip_write: bool, ) -> None: + setup_logging(log_level) setup_sentry() diff --git a/snuba/consumers/consumer.py b/snuba/consumers/consumer.py index 34b806b9b6..9ecf400e2d 100644 --- a/snuba/consumers/consumer.py +++ b/snuba/consumers/consumer.py @@ -39,9 +39,6 @@ RunTaskInThreads, RunTaskWithMultiprocessing, ) -from arroyo.processing.strategies.run_task_with_multiprocessing import ( - MultiprocessingPool, -) from arroyo.types import ( BaseValue, BrokerValue, @@ -392,6 +389,7 @@ def build_batch_writer( commit_log_config: Optional[CommitLogConfig] = None, slice_id: Optional[int] = None, ) -> Callable[[], ProcessedMessageBatchWriter]: + assert not (replacements_producer is None) ^ (replacements_topic is None) supports_replacements = replacements_producer is not None @@ -802,11 +800,6 @@ def __init__( replacements, slice_id, ) - self.__pool = ( - MultiprocessingPool(self.__processes, initialize_parallel_transform) - if self.__processes is not None - else None - ) def create_with_partitions( self, @@ -840,17 +833,18 @@ def flush_batch( Union[FilteredPayload, MultistorageKafkaPayload] ] - if self.__pool is None: + if self.__processes is None: inner_strategy = RunTask(transform_function, collect) else: inner_strategy = RunTaskWithMultiprocessing( transform_function, collect, + self.__processes, max_batch_size=self.__max_batch_size, max_batch_time=self.__max_batch_time, - pool=self.__pool, input_block_size=self.__input_block_size, output_block_size=self.__output_block_size, + initializer=self.__initialize_parallel_transform, ) return RunTask( @@ -860,7 +854,3 @@ def flush_batch( inner_strategy, ), ) - - def shutdown(self) -> None: - if self.__pool: - self.__pool.close() diff --git a/snuba/consumers/strategy_factory.py b/snuba/consumers/strategy_factory.py index 083f10baa4..2a381c6160 100644 --- a/snuba/consumers/strategy_factory.py +++ b/snuba/consumers/strategy_factory.py @@ -14,9 +14,6 @@ ) from arroyo.processing.strategies.commit import CommitOffsets from arroyo.processing.strategies.healthcheck import Healthcheck -from arroyo.processing.strategies.run_task_with_multiprocessing import ( - MultiprocessingPool, -) from arroyo.types import BaseValue, Commit, FilteredPayload, Message, Partition from snuba.consumers.consumer import BytesInsertBatch, ProcessedMessageBatchWriter @@ -101,11 +98,6 @@ def __init__( self.__output_block_size = output_block_size self.__initialize_parallel_transform = initialize_parallel_transform self.__health_check_file = health_check_file - self.__pool = ( - MultiprocessingPool(self.__processes, self.__initialize_parallel_transform) - if self.__processes - else None - ) def __should_accept(self, message: Message[KafkaPayload]) -> bool: assert self.__prefilter is not None @@ -153,17 +145,18 @@ def flush_batch( transform_function = self.__process_message strategy: ProcessingStrategy[Union[FilteredPayload, KafkaPayload]] - if self.__pool is None: + if self.__processes is None: strategy = RunTask(transform_function, collect) else: strategy = RunTaskWithMultiprocessing( transform_function, collect, + self.__processes, max_batch_size=self.__max_batch_size, max_batch_time=self.__max_batch_time, - pool=self.__pool, input_block_size=self.__input_block_size, output_block_size=self.__output_block_size, + initializer=self.__initialize_parallel_transform, ) if self.__prefilter is not None: @@ -180,7 +173,3 @@ def flush_batch( strategy = Healthcheck(self.__health_check_file, strategy) return strategy - - def shutdown(self) -> None: - if self.__pool: - self.__pool.close()