Skip to content

Commit

Permalink
Dynamically define dlq and retry topics (GSI-1175) (#143)
Browse files Browse the repository at this point in the history
* Dynamically define dlq and retry topics

* Remove function for forming dlq topic name (it's simple)

Remove accidental subscription to dlq topics in main class (tests passed earlier because the names had '-' instead of '.', meaning the topics were not touched in test)

* Use consistent simple concat for retry topic names

* Define constants for test dlq and retry topics

* Limit splits to get accurate service and topic names

Parametrize test case
  • Loading branch information
TheByronHimes authored Nov 22, 2024
1 parent 5cec4f8 commit da9f023
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 165 deletions.
34 changes: 2 additions & 32 deletions src/hexkit/providers/akafka/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from typing import Literal

from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr, model_validator
from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr
from pydantic_settings import BaseSettings


Expand Down Expand Up @@ -82,20 +82,6 @@ class KafkaConfig(BaseSettings):
+ " services that have a need to send/receive larger messages should set this.",
examples=[1024 * 1024, 16 * 1024 * 1024],
)
kafka_dlq_topic: str = Field(
default="",
description="The name of the service-specific topic used for the dead letter queue.",
examples=["dcs-dlq", "ifrs-dlq", "mass-dlq"],
title="Kafka DLQ Topic",
)
kafka_retry_topic: str = Field(
default="",
description=(
"The name of the service-specific topic used to retry previously failed events."
),
title="Kafka Retry Topic",
examples=["dcs-dlq-retry", "ifrs-dlq-retry", "mass-dlq-retry"],
)
kafka_max_retries: NonNegativeInt = Field(
default=0,
description=(
Expand All @@ -111,8 +97,7 @@ class KafkaConfig(BaseSettings):
"A flag to toggle the dead letter queue. If set to False, the service will"
+ " crash upon exhausting retries instead of publishing events to the DLQ."
+ " If set to True, the service will publish events to the DLQ topic after"
+ " exhausting all retries, and both `kafka_dlq_topic` and"
+ " `kafka_retry_topic` must be set."
+ " exhausting all retries"
),
title="Kafka Enable DLQ",
examples=[True, False],
Expand All @@ -126,18 +111,3 @@ class KafkaConfig(BaseSettings):
title="Kafka Retry Backoff",
examples=[0, 1, 2, 3, 5],
)

@model_validator(mode="after")
def validate_retry_topic(self):
"""Ensure that the retry topic is not the same as the DLQ topic."""
if self.kafka_retry_topic and self.kafka_retry_topic == self.kafka_dlq_topic:
raise ValueError(
"kafka_retry_topic and kafka_dlq_topic cannot be the same."
)
if self.kafka_enable_dlq and not (
self.kafka_dlq_topic and self.kafka_retry_topic
):
raise ValueError(
"Both kafka_dlq_topic and kafka_retry_topic must be set when the DLQ is enabled."
)
return self
80 changes: 52 additions & 28 deletions src/hexkit/providers/akafka/provider/eventsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ def encoded_headers(self) -> list[tuple[str, bytes]]:
return [(name, value.encode("ascii")) for name, value in self.headers.items()]


def service_name_from_dlq_topic(dlq_topic: str) -> str:
"""Extract the service name from a DLQ topic name."""
return dlq_topic.rsplit(".", 1)[1].removesuffix("-dlq")


def get_event_label(event: ConsumerEvent) -> str:
"""Make a label that identifies an event."""
return (
Expand Down Expand Up @@ -217,11 +222,11 @@ async def construct(
topics = translator.topics_of_interest

if config.kafka_enable_dlq:
topics.append(config.service_name + "-retry")
if dlq_publisher is None:
error = ValueError("A publisher is required when the DLQ is enabled.")
logging.error(error)
raise error
topics.append(config.kafka_retry_topic)

consumer = kafka_consumer_cls(
*topics,
Expand Down Expand Up @@ -277,14 +282,14 @@ def __init__(
self._translator = translator
self._types_whitelist = translator.types_of_interest
self._dlq_publisher = dlq_publisher
self._dlq_topic = config.kafka_dlq_topic
self._retry_topic = config.kafka_retry_topic
self._dlq_suffix = f".{config.service_name}-dlq"
self._retry_topic = config.service_name + "-retry"
self._max_retries = config.kafka_max_retries
self._enable_dlq = config.kafka_enable_dlq
self._retry_backoff = config.kafka_retry_backoff

async def _publish_to_dlq(self, *, event: ExtractedEventInfo, exc: Exception):
"""Publish the event to the DLQ topic.
"""Publish the event to the corresponding DLQ topic.
The exception instance is included in the headers, but is split into the class
name and the string representation of the exception.
Expand All @@ -293,19 +298,19 @@ async def _publish_to_dlq(self, *, event: ExtractedEventInfo, exc: Exception):
- `event`: The event to publish to the DLQ.
- `exc`: The exception that caused the event to be published to the DLQ.
"""
logging.debug("About to publish an event to DLQ topic '%s'", self._dlq_topic)
dlq_topic = event.topic + self._dlq_suffix
logging.debug("About to publish an event to DLQ topic '%s'", dlq_topic)
await self._dlq_publisher.publish( # type: ignore
payload=event.payload,
type_=event.type_,
topic=self._dlq_topic,
topic=dlq_topic,
key=event.key,
headers={
ORIGINAL_TOPIC_FIELD: event.topic,
EXC_CLASS_FIELD: exc.__class__.__name__,
EXC_MSG_FIELD: str(exc),
},
)
logging.info("Published event to DLQ topic '%s'", self._dlq_topic)
logging.info("Published event to DLQ topic '%s'", dlq_topic)

async def _retry_event(self, *, event: ExtractedEventInfo, retries_left: int):
"""Retry the event until the maximum number of retries is reached.
Expand Down Expand Up @@ -359,9 +364,10 @@ async def _handle_consumption(self, *, event: ExtractedEventInfo):
If the event fails:
1. Retry until retries are exhausted, if retries are configured.
2. Publish the event to the DLQ topic if the DLQ is enabled. Done afterward.
2. Publish event to DLQ topic if the DLQ is enabled. Done afterward.
or
3. Allow failure with unhandled error if DLQ is not configured.
3. Allow failure with unhandled error if the DLQ is not enabled. This is the
pre-DLQ behavior.
"""
try:
await self._translator.consume(
Expand Down Expand Up @@ -400,15 +406,28 @@ def _extract_info(self, event: ConsumerEvent) -> ExtractedEventInfo:
"""Validate the event, returning the extracted info."""
event_info = ExtractedEventInfo(event)
if event_info.topic == self._retry_topic:
# The event is being consumed by from the retry topic, so we expect the
# original topic to be in the headers.
event_info.topic = event_info.headers.get(ORIGINAL_TOPIC_FIELD, "")
logging.info(
"Received previously failed event from topic '%s' for retry.",
event_info.topic,
)
elif event_info.topic.endswith(self._dlq_suffix):
# The event is being consumed from a DLQ topic, so we remove the DLQ suffix
# to produce the original topic name.
original_topic = event_info.topic.removesuffix(self._dlq_suffix)
logging.info(
"Received event from DLQ topic '%s' for processing. Original topic: '%s'",
event_info.topic,
original_topic,
)
event_info.topic = original_topic
return event_info

def _validate_extracted_info(self, event: ExtractedEventInfo):
"""Extract and validate the event, returning the correlation ID and the extracted info."""
dlq_topic = event.topic + self._dlq_suffix
correlation_id = event.headers.get("correlation_id", "")
errors = []
if not event.type_:
Expand All @@ -417,10 +436,10 @@ def _validate_extracted_info(self, event: ExtractedEventInfo):
errors.append(f"event type '{event.type_}' is not in the whitelist")
if not correlation_id:
errors.append("correlation_id is empty")
if event.topic in (self._retry_topic, self._dlq_topic):
if event.topic in (self._retry_topic, dlq_topic):
errors.append(
f"original_topic header cannot be {self._retry_topic} or"
+ f" {self._dlq_topic}. Value: '{event.topic}'"
+ f" {dlq_topic}. Value: '{event.topic}'"
)
elif not event.topic:
errors.append(
Expand Down Expand Up @@ -457,12 +476,13 @@ async def _consume_event(self, event: ConsumerEvent) -> None:
await self._handle_consumption(event=event_info)
except Exception:
# Errors only bubble up here if the DLQ isn't used
dlq_topic = event_info.topic + self._dlq_suffix
logging.critical(
"An error occurred while processing event of type '%s': %s. It was NOT"
" placed in the DLQ topic (%s)",
event_info.type_,
event_label,
self._dlq_topic if self._enable_dlq else "DLQ is disabled",
dlq_topic if self._enable_dlq else "DLQ is disabled",
)
raise
else:
Expand Down Expand Up @@ -514,7 +534,6 @@ def validate_dlq_headers(event: ConsumerEvent) -> None:
expected_headers = [
"type",
"correlation_id",
ORIGINAL_TOPIC_FIELD,
EXC_CLASS_FIELD,
EXC_MSG_FIELD,
]
Expand Down Expand Up @@ -544,7 +563,7 @@ async def process_dlq_event(event: ConsumerEvent) -> Optional[ExtractedEventInfo


class KafkaDLQSubscriber(InboundProviderBase):
"""A kafka event subscriber that subscribes to the configured DLQ topic and either
"""A kafka event subscriber that subscribes to the specified DLQ topic and either
discards each event or publishes it to the retry topic as instructed.
Further processing before requeuing is provided by a callable adhering to the
DLQEventProcessor definition.
Expand All @@ -556,6 +575,7 @@ async def construct(
cls,
*,
config: KafkaConfig,
dlq_topic: str,
dlq_publisher: EventPublisherProtocol,
process_dlq_event: DLQEventProcessor = process_dlq_event,
kafka_consumer_cls: type[KafkaConsumerCompatible] = AIOKafkaConsumer,
Expand All @@ -566,10 +586,13 @@ async def construct(
Args:
- `config`:
Config parameters needed for connecting to Apache Kafka.
- `dlq_topic`:
The name of the DLQ topic to subscribe to. Has the format
"{original_topic}.{service_name}-dlq".
- `dlq_publisher`:
A running instance of a publishing provider that implements the
EventPublisherProtocol, such as KafkaEventPublisher. It is used to publish
events to the configured retry topic.
events to the retry topic.
- `kafka_consumer_cls`:
Overwrite the used Kafka consumer class. Only intended for unit testing.
- `process_dlq_event`:
Expand All @@ -585,7 +608,7 @@ async def construct(
)

consumer = kafka_consumer_cls(
config.kafka_dlq_topic,
dlq_topic,
bootstrap_servers=",".join(config.kafka_servers),
security_protocol=config.kafka_security_protocol,
ssl_context=generate_ssl_context(config),
Expand All @@ -603,10 +626,9 @@ async def construct(
await consumer.start()
try:
yield cls(
dlq_topic=config.kafka_dlq_topic,
retry_topic=config.kafka_retry_topic,
consumer=consumer,
dlq_topic=dlq_topic,
dlq_publisher=dlq_publisher,
consumer=consumer,
process_dlq_event=process_dlq_event,
)
finally:
Expand All @@ -616,7 +638,6 @@ def __init__(
self,
*,
dlq_topic: str,
retry_topic: str,
dlq_publisher: EventPublisherProtocol,
consumer: KafkaConsumerCompatible,
process_dlq_event: DLQEventProcessor,
Expand All @@ -630,10 +651,8 @@ def __init__(
A running instance of a publishing provider that implements the
EventPublisherProtocol, such as KafkaEventPublisher.
- `dlq_topic`:
The name of the topic used to store failed events, to which the
KafkaDLQSubscriber subscribes.
- `retry_topic`:
The name of the topic used to requeue failed events.
The name of the DLQ topic to subscribe to. Has the format
"{original_topic}.{service_name}-dlq".
- `process_dlq_event`:
An async callable adhering to the DLQEventProcessor definition that provides
validation and processing for events from the DLQ. It should return _either_
Expand All @@ -645,21 +664,26 @@ def __init__(
self._consumer = consumer
self._publisher = dlq_publisher
self._dlq_topic = dlq_topic
self._retry_topic = retry_topic

# In KafkaEventSubscriber, we get the service name from the config. However,
# the service name in effect here probably differs -- e.g., a DLQ-specific
# service might be running the KafkaDLQSubscriber. So we extract the service
# name from the DLQ topic name instead.
service_name = service_name_from_dlq_topic(dlq_topic)
self._retry_topic = service_name + "-retry"
self._process_dlq_event = process_dlq_event

async def _publish_to_retry(self, *, event: ExtractedEventInfo) -> None:
"""Publish the event to the retry topic."""
correlation_id = event.headers["correlation_id"]
original_topic = event.headers[ORIGINAL_TOPIC_FIELD]

async with set_correlation_id(correlation_id):
await self._publisher.publish(
payload=event.payload,
type_=event.type_,
key=event.key,
topic=self._retry_topic,
headers={ORIGINAL_TOPIC_FIELD: original_topic},
headers={ORIGINAL_TOPIC_FIELD: self._dlq_topic.rsplit(".", 1)[0]},
)
logging.info(
"Published an event with type '%s' to the retry topic '%s'",
Expand Down
Loading

0 comments on commit da9f023

Please sign in to comment.