Skip to content

Commit

Permalink
fix(dlq): Actually consume the right topic (#4401)
Browse files Browse the repository at this point in the history
Oops
  • Loading branch information
lynnagara committed Jun 22, 2023
1 parent 2986fda commit 39c86e1
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions snuba/consumers/consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(
max_batch_time_ms: int,
metrics: MetricsBackend,
slice_id: Optional[int],
join_timeout: Optional[int],
join_timeout: Optional[float],
profile_path: Optional[str] = None,
max_poll_interval_ms: Optional[int] = None,
) -> None:
Expand Down Expand Up @@ -146,6 +146,7 @@ def __init__(
def __build_consumer(
self,
strategy_factory: ProcessingStrategyFactory[KafkaPayload],
input_topic: Topic,
dlq_policy: Optional[DlqPolicy[KafkaPayload]],
) -> StreamProcessor[KafkaPayload]:

Expand Down Expand Up @@ -179,7 +180,7 @@ def log_general_error(e: KafkaError) -> None:

return StreamProcessor(
consumer,
self.raw_topic,
input_topic,
strategy_factory,
IMMEDIATE,
dlq_policy=dlq_policy,
Expand Down Expand Up @@ -299,7 +300,9 @@ def build_base_consumer(self) -> StreamProcessor[KafkaPayload]:
Builds the consumer.
"""
return self.__build_consumer(
self.build_streaming_strategy_factory(), self.__build_default_dlq_policy()
self.build_streaming_strategy_factory(),
self.raw_topic,
self.__build_default_dlq_policy(),
)

def build_dlq_consumer(
Expand All @@ -325,8 +328,13 @@ def build_dlq_consumer(
else:
raise ValueError("Invalid DLQ policy")

dlq_topic = self.__consumer_config.dlq_topic
assert dlq_topic is not None

return self.__build_consumer(
self.build_dlq_strategy_factory(instruction), dlq_policy
self.build_dlq_strategy_factory(instruction),
Topic(dlq_topic.physical_topic_name),
dlq_policy,
)

def __build_default_dlq_policy(self) -> Optional[DlqPolicy[KafkaPayload]]:
Expand Down

0 comments on commit 39c86e1

Please sign in to comment.