diff --git a/snuba/consumers/consumer_builder.py b/snuba/consumers/consumer_builder.py index be6285de46..0f95a0c309 100644 --- a/snuba/consumers/consumer_builder.py +++ b/snuba/consumers/consumer_builder.py @@ -70,7 +70,7 @@ def __init__( max_batch_time_ms: int, metrics: MetricsBackend, slice_id: Optional[int], - join_timeout: Optional[int], + join_timeout: Optional[float], profile_path: Optional[str] = None, max_poll_interval_ms: Optional[int] = None, ) -> None: @@ -146,6 +146,7 @@ def __init__( def __build_consumer( self, strategy_factory: ProcessingStrategyFactory[KafkaPayload], + input_topic: Topic, dlq_policy: Optional[DlqPolicy[KafkaPayload]], ) -> StreamProcessor[KafkaPayload]: @@ -179,7 +180,7 @@ def log_general_error(e: KafkaError) -> None: return StreamProcessor( consumer, - self.raw_topic, + input_topic, strategy_factory, IMMEDIATE, dlq_policy=dlq_policy, @@ -299,7 +300,9 @@ def build_base_consumer(self) -> StreamProcessor[KafkaPayload]: Builds the consumer. """ return self.__build_consumer( - self.build_streaming_strategy_factory(), self.__build_default_dlq_policy() + self.build_streaming_strategy_factory(), + self.raw_topic, + self.__build_default_dlq_policy(), ) def build_dlq_consumer( @@ -325,8 +328,13 @@ def build_dlq_consumer( else: raise ValueError("Invalid DLQ policy") + dlq_topic = self.__consumer_config.dlq_topic + assert dlq_topic is not None + return self.__build_consumer( - self.build_dlq_strategy_factory(instruction), dlq_policy + self.build_dlq_strategy_factory(instruction), + Topic(dlq_topic.physical_topic_name), + dlq_policy, ) def __build_default_dlq_policy(self) -> Optional[DlqPolicy[KafkaPayload]]: