From 55b6c9f10e98870e326a24463d42a2f15003ccac Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 16 Jun 2023 12:56:44 -0700 Subject: [PATCH] feat: Arroyo 2.12.0 (#4362) Applying the basic retry policy with 3 retries everywhere now --- requirements.txt | 2 +- snuba/consumers/consumer_builder.py | 20 +------------------- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/requirements.txt b/requirements.txt index 6d7a855ab5..5228ad9f8f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,7 +24,7 @@ python-dateutil==2.8.2 python-rapidjson==1.8 pytz==2022.2.1 redis==4.3.4 -sentry-arroyo==2.11.6 +sentry-arroyo==2.12.0 sentry-kafka-schemas==0.1.10 sentry-redis-tools==0.1.6 sentry-relay==0.8.21 diff --git a/snuba/consumers/consumer_builder.py b/snuba/consumers/consumer_builder.py index 7bb45fa404..1d3687ed5b 100644 --- a/snuba/consumers/consumer_builder.py +++ b/snuba/consumers/consumer_builder.py @@ -16,8 +16,7 @@ from arroyo.processing.strategies import ProcessingStrategyFactory from arroyo.types import Topic from arroyo.utils.profiler import ProcessingStrategyProfilerWrapperFactory -from arroyo.utils.retries import BasicRetryPolicy, RetryPolicy -from confluent_kafka import KafkaError, KafkaException, Producer +from confluent_kafka import KafkaError, Producer from sentry_sdk.api import configure_scope from snuba.consumers.consumer import ( @@ -72,7 +71,6 @@ def __init__( metrics: MetricsBackend, slice_id: Optional[int], join_timeout: Optional[int], - commit_retry_policy: Optional[RetryPolicy] = None, profile_path: Optional[str] = None, max_poll_interval_ms: Optional[int] = None, ) -> None: @@ -143,21 +141,6 @@ def __init__( self.__profile_path = profile_path self.max_poll_interval_ms = max_poll_interval_ms - if commit_retry_policy is None: - commit_retry_policy = BasicRetryPolicy( - 3, - 1, - lambda e: isinstance(e, KafkaException) - and e.args[0].code() - in ( - KafkaError.REQUEST_TIMED_OUT, - KafkaError.NOT_COORDINATOR, - KafkaError._WAIT_COORD, - ), - ) - - self.__commit_retry_policy = commit_retry_policy - def __build_consumer( self, strategy_factory: ProcessingStrategyFactory[KafkaPayload], @@ -189,7 +172,6 @@ def log_general_error(e: KafkaError) -> None: consumer = KafkaConsumer( configuration, - commit_retry_policy=self.__commit_retry_policy, ) self.dlq_producer: Optional[KafkaProducer]