diff --git a/src/hexkit/providers/akafka/provider/eventsub.py b/src/hexkit/providers/akafka/provider/eventsub.py index 2c26cc76..7aeb55d5 100644 --- a/src/hexkit/providers/akafka/provider/eventsub.py +++ b/src/hexkit/providers/akafka/provider/eventsub.py @@ -222,13 +222,11 @@ async def construct( topics = translator.topics_of_interest if config.kafka_enable_dlq: - dlq_topics = [f"{topic}-{config.service_name}-dlq" for topic in topics] - topics.extend(dlq_topics) + topics.append(f"{config.service_name}-retry") if dlq_publisher is None: error = ValueError("A publisher is required when the DLQ is enabled.") logging.error(error) raise error - topics.append(f"{config.service_name}-retry") consumer = kafka_consumer_cls( *topics, @@ -290,10 +288,6 @@ def __init__( self._enable_dlq = config.kafka_enable_dlq self._retry_backoff = config.kafka_retry_backoff - def _get_dlq_topic(self, topic: str) -> str: - """Form the DLQ topic name for a given topic.""" - return topic + self._dlq_suffix - async def _publish_to_dlq(self, *, event: ExtractedEventInfo, exc: Exception): """Publish the event to the corresponding DLQ topic. @@ -304,7 +298,7 @@ async def _publish_to_dlq(self, *, event: ExtractedEventInfo, exc: Exception): - `event`: The event to publish to the DLQ. - `exc`: The exception that caused the event to be published to the DLQ. """ - dlq_topic = self._get_dlq_topic(event.topic) + dlq_topic = event.topic + self._dlq_suffix logging.debug("About to publish an event to DLQ topic '%s'", dlq_topic) await self._dlq_publisher.publish( # type: ignore payload=event.payload, @@ -433,7 +427,7 @@ def _extract_info(self, event: ConsumerEvent) -> ExtractedEventInfo: def _validate_extracted_info(self, event: ExtractedEventInfo): """Extract and validate the event, returning the correlation ID and the extracted info.""" - dlq_topic = self._get_dlq_topic(event.topic) + dlq_topic = event.topic + self._dlq_suffix correlation_id = event.headers.get("correlation_id", "") errors = [] if not event.type_: @@ -482,7 +476,7 @@ async def _consume_event(self, event: ConsumerEvent) -> None: await self._handle_consumption(event=event_info) except Exception: # Errors only bubble up here if the DLQ isn't used - dlq_topic = self._get_dlq_topic(event_info.topic) + dlq_topic = event_info.topic + self._dlq_suffix logging.critical( "An error occurred while processing event of type '%s': %s. It was NOT" " placed in the DLQ topic (%s)", @@ -671,6 +665,10 @@ def __init__( self._publisher = dlq_publisher self._dlq_topic = dlq_topic + # In KafkaEventSubscriber, we get the service name from the config. However, + # the service name in effect here probably differs -- e.g., a DLQ-specific + # service might be running the KafkaDLQSubscriber. So we extract the service + # name from the DLQ topic name instead. service_name = service_name_from_dlq_topic(dlq_topic) self._retry_topic = service_name + "-retry" self._process_dlq_event = process_dlq_event