Skip to content

Commit

Permalink
Remove function for forming dlq topic name (it's simple)
Browse files Browse the repository at this point in the history
Remove accidental subscription to dlq topics in main class (tests passed earlier because the names had '-' instead of '.', meaning the topics were not touched in test)
  • Loading branch information
TheByronHimes committed Nov 21, 2024
1 parent 189a115 commit 943ad5b
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions src/hexkit/providers/akafka/provider/eventsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,11 @@ async def construct(
topics = translator.topics_of_interest

if config.kafka_enable_dlq:
dlq_topics = [f"{topic}-{config.service_name}-dlq" for topic in topics]
topics.extend(dlq_topics)
topics.append(f"{config.service_name}-retry")
if dlq_publisher is None:
error = ValueError("A publisher is required when the DLQ is enabled.")
logging.error(error)
raise error
topics.append(f"{config.service_name}-retry")

consumer = kafka_consumer_cls(
*topics,
Expand Down Expand Up @@ -290,10 +288,6 @@ def __init__(
self._enable_dlq = config.kafka_enable_dlq
self._retry_backoff = config.kafka_retry_backoff

def _get_dlq_topic(self, topic: str) -> str:
"""Form the DLQ topic name for a given topic."""
return topic + self._dlq_suffix

async def _publish_to_dlq(self, *, event: ExtractedEventInfo, exc: Exception):
"""Publish the event to the corresponding DLQ topic.
Expand All @@ -304,7 +298,7 @@ async def _publish_to_dlq(self, *, event: ExtractedEventInfo, exc: Exception):
- `event`: The event to publish to the DLQ.
- `exc`: The exception that caused the event to be published to the DLQ.
"""
dlq_topic = self._get_dlq_topic(event.topic)
dlq_topic = event.topic + self._dlq_suffix
logging.debug("About to publish an event to DLQ topic '%s'", dlq_topic)
await self._dlq_publisher.publish( # type: ignore
payload=event.payload,
Expand Down Expand Up @@ -433,7 +427,7 @@ def _extract_info(self, event: ConsumerEvent) -> ExtractedEventInfo:

def _validate_extracted_info(self, event: ExtractedEventInfo):
"""Extract and validate the event, returning the correlation ID and the extracted info."""
dlq_topic = self._get_dlq_topic(event.topic)
dlq_topic = event.topic + self._dlq_suffix
correlation_id = event.headers.get("correlation_id", "")
errors = []
if not event.type_:
Expand Down Expand Up @@ -482,7 +476,7 @@ async def _consume_event(self, event: ConsumerEvent) -> None:
await self._handle_consumption(event=event_info)
except Exception:
# Errors only bubble up here if the DLQ isn't used
dlq_topic = self._get_dlq_topic(event_info.topic)
dlq_topic = event_info.topic + self._dlq_suffix
logging.critical(
"An error occurred while processing event of type '%s': %s. It was NOT"
" placed in the DLQ topic (%s)",
Expand Down Expand Up @@ -671,6 +665,10 @@ def __init__(
self._publisher = dlq_publisher
self._dlq_topic = dlq_topic

# In KafkaEventSubscriber, we get the service name from the config. However,
# the service name in effect here probably differs -- e.g., a DLQ-specific
# service might be running the KafkaDLQSubscriber. So we extract the service
# name from the DLQ topic name instead.
service_name = service_name_from_dlq_topic(dlq_topic)
self._retry_topic = service_name + "-retry"
self._process_dlq_event = process_dlq_event
Expand Down

0 comments on commit 943ad5b

Please sign in to comment.