diff --git a/src/hexkit/providers/akafka/config.py b/src/hexkit/providers/akafka/config.py index d88353f4..8c1bb863 100644 --- a/src/hexkit/providers/akafka/config.py +++ b/src/hexkit/providers/akafka/config.py @@ -18,7 +18,7 @@ from typing import Literal -from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr, model_validator +from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr from pydantic_settings import BaseSettings @@ -82,20 +82,6 @@ class KafkaConfig(BaseSettings): + " services that have a need to send/receive larger messages should set this.", examples=[1024 * 1024, 16 * 1024 * 1024], ) - kafka_dlq_topic: str = Field( - default="", - description="The name of the service-specific topic used for the dead letter queue.", - examples=["dcs-dlq", "ifrs-dlq", "mass-dlq"], - title="Kafka DLQ Topic", - ) - kafka_retry_topic: str = Field( - default="", - description=( - "The name of the service-specific topic used to retry previously failed events." - ), - title="Kafka Retry Topic", - examples=["dcs-dlq-retry", "ifrs-dlq-retry", "mass-dlq-retry"], - ) kafka_max_retries: NonNegativeInt = Field( default=0, description=( @@ -111,8 +97,7 @@ class KafkaConfig(BaseSettings): "A flag to toggle the dead letter queue. If set to False, the service will" + " crash upon exhausting retries instead of publishing events to the DLQ." + " If set to True, the service will publish events to the DLQ topic after" - + " exhausting all retries, and both `kafka_dlq_topic` and" - + " `kafka_retry_topic` must be set." + + " exhausting all retries" ), title="Kafka Enable DLQ", examples=[True, False], @@ -126,18 +111,3 @@ class KafkaConfig(BaseSettings): title="Kafka Retry Backoff", examples=[0, 1, 2, 3, 5], ) - - @model_validator(mode="after") - def validate_retry_topic(self): - """Ensure that the retry topic is not the same as the DLQ topic.""" - if self.kafka_retry_topic and self.kafka_retry_topic == self.kafka_dlq_topic: - raise ValueError( - "kafka_retry_topic and kafka_dlq_topic cannot be the same." - ) - if self.kafka_enable_dlq and not ( - self.kafka_dlq_topic and self.kafka_retry_topic - ): - raise ValueError( - "Both kafka_dlq_topic and kafka_retry_topic must be set when the DLQ is enabled." - ) - return self diff --git a/src/hexkit/providers/akafka/provider/eventsub.py b/src/hexkit/providers/akafka/provider/eventsub.py index ef6336af..f14ca193 100644 --- a/src/hexkit/providers/akafka/provider/eventsub.py +++ b/src/hexkit/providers/akafka/provider/eventsub.py @@ -87,6 +87,11 @@ def encoded_headers(self) -> list[tuple[str, bytes]]: return [(name, value.encode("ascii")) for name, value in self.headers.items()] +def service_name_from_dlq_topic(dlq_topic: str) -> str: + """Extract the service name from a DLQ topic name.""" + return dlq_topic.rsplit(".", 1)[1].removesuffix("-dlq") + + def get_event_label(event: ConsumerEvent) -> str: """Make a label that identifies an event.""" return ( @@ -217,11 +222,11 @@ async def construct( topics = translator.topics_of_interest if config.kafka_enable_dlq: + topics.append(config.service_name + "-retry") if dlq_publisher is None: error = ValueError("A publisher is required when the DLQ is enabled.") logging.error(error) raise error - topics.append(config.kafka_retry_topic) consumer = kafka_consumer_cls( *topics, @@ -277,14 +282,14 @@ def __init__( self._translator = translator self._types_whitelist = translator.types_of_interest self._dlq_publisher = dlq_publisher - self._dlq_topic = config.kafka_dlq_topic - self._retry_topic = config.kafka_retry_topic + self._dlq_suffix = f".{config.service_name}-dlq" + self._retry_topic = config.service_name + "-retry" self._max_retries = config.kafka_max_retries self._enable_dlq = config.kafka_enable_dlq self._retry_backoff = config.kafka_retry_backoff async def _publish_to_dlq(self, *, event: ExtractedEventInfo, exc: Exception): - """Publish the event to the DLQ topic. + """Publish the event to the corresponding DLQ topic. The exception instance is included in the headers, but is split into the class name and the string representation of the exception. @@ -293,19 +298,19 @@ async def _publish_to_dlq(self, *, event: ExtractedEventInfo, exc: Exception): - `event`: The event to publish to the DLQ. - `exc`: The exception that caused the event to be published to the DLQ. """ - logging.debug("About to publish an event to DLQ topic '%s'", self._dlq_topic) + dlq_topic = event.topic + self._dlq_suffix + logging.debug("About to publish an event to DLQ topic '%s'", dlq_topic) await self._dlq_publisher.publish( # type: ignore payload=event.payload, type_=event.type_, - topic=self._dlq_topic, + topic=dlq_topic, key=event.key, headers={ - ORIGINAL_TOPIC_FIELD: event.topic, EXC_CLASS_FIELD: exc.__class__.__name__, EXC_MSG_FIELD: str(exc), }, ) - logging.info("Published event to DLQ topic '%s'", self._dlq_topic) + logging.info("Published event to DLQ topic '%s'", dlq_topic) async def _retry_event(self, *, event: ExtractedEventInfo, retries_left: int): """Retry the event until the maximum number of retries is reached. @@ -359,9 +364,10 @@ async def _handle_consumption(self, *, event: ExtractedEventInfo): If the event fails: 1. Retry until retries are exhausted, if retries are configured. - 2. Publish the event to the DLQ topic if the DLQ is enabled. Done afterward. + 2. Publish event to DLQ topic if the DLQ is enabled. Done afterward. or - 3. Allow failure with unhandled error if DLQ is not configured. + 3. Allow failure with unhandled error if the DLQ is not enabled. This is the + pre-DLQ behavior. """ try: await self._translator.consume( @@ -400,15 +406,28 @@ def _extract_info(self, event: ConsumerEvent) -> ExtractedEventInfo: """Validate the event, returning the extracted info.""" event_info = ExtractedEventInfo(event) if event_info.topic == self._retry_topic: + # The event is being consumed by from the retry topic, so we expect the + # original topic to be in the headers. event_info.topic = event_info.headers.get(ORIGINAL_TOPIC_FIELD, "") logging.info( "Received previously failed event from topic '%s' for retry.", event_info.topic, ) + elif event_info.topic.endswith(self._dlq_suffix): + # The event is being consumed from a DLQ topic, so we remove the DLQ suffix + # to produce the original topic name. + original_topic = event_info.topic.removesuffix(self._dlq_suffix) + logging.info( + "Received event from DLQ topic '%s' for processing. Original topic: '%s'", + event_info.topic, + original_topic, + ) + event_info.topic = original_topic return event_info def _validate_extracted_info(self, event: ExtractedEventInfo): """Extract and validate the event, returning the correlation ID and the extracted info.""" + dlq_topic = event.topic + self._dlq_suffix correlation_id = event.headers.get("correlation_id", "") errors = [] if not event.type_: @@ -417,10 +436,10 @@ def _validate_extracted_info(self, event: ExtractedEventInfo): errors.append(f"event type '{event.type_}' is not in the whitelist") if not correlation_id: errors.append("correlation_id is empty") - if event.topic in (self._retry_topic, self._dlq_topic): + if event.topic in (self._retry_topic, dlq_topic): errors.append( f"original_topic header cannot be {self._retry_topic} or" - + f" {self._dlq_topic}. Value: '{event.topic}'" + + f" {dlq_topic}. Value: '{event.topic}'" ) elif not event.topic: errors.append( @@ -457,12 +476,13 @@ async def _consume_event(self, event: ConsumerEvent) -> None: await self._handle_consumption(event=event_info) except Exception: # Errors only bubble up here if the DLQ isn't used + dlq_topic = event_info.topic + self._dlq_suffix logging.critical( "An error occurred while processing event of type '%s': %s. It was NOT" " placed in the DLQ topic (%s)", event_info.type_, event_label, - self._dlq_topic if self._enable_dlq else "DLQ is disabled", + dlq_topic if self._enable_dlq else "DLQ is disabled", ) raise else: @@ -514,7 +534,6 @@ def validate_dlq_headers(event: ConsumerEvent) -> None: expected_headers = [ "type", "correlation_id", - ORIGINAL_TOPIC_FIELD, EXC_CLASS_FIELD, EXC_MSG_FIELD, ] @@ -544,7 +563,7 @@ async def process_dlq_event(event: ConsumerEvent) -> Optional[ExtractedEventInfo class KafkaDLQSubscriber(InboundProviderBase): - """A kafka event subscriber that subscribes to the configured DLQ topic and either + """A kafka event subscriber that subscribes to the specified DLQ topic and either discards each event or publishes it to the retry topic as instructed. Further processing before requeuing is provided by a callable adhering to the DLQEventProcessor definition. @@ -556,6 +575,7 @@ async def construct( cls, *, config: KafkaConfig, + dlq_topic: str, dlq_publisher: EventPublisherProtocol, process_dlq_event: DLQEventProcessor = process_dlq_event, kafka_consumer_cls: type[KafkaConsumerCompatible] = AIOKafkaConsumer, @@ -566,10 +586,13 @@ async def construct( Args: - `config`: Config parameters needed for connecting to Apache Kafka. + - `dlq_topic`: + The name of the DLQ topic to subscribe to. Has the format + "{original_topic}.{service_name}-dlq". - `dlq_publisher`: A running instance of a publishing provider that implements the EventPublisherProtocol, such as KafkaEventPublisher. It is used to publish - events to the configured retry topic. + events to the retry topic. - `kafka_consumer_cls`: Overwrite the used Kafka consumer class. Only intended for unit testing. - `process_dlq_event`: @@ -585,7 +608,7 @@ async def construct( ) consumer = kafka_consumer_cls( - config.kafka_dlq_topic, + dlq_topic, bootstrap_servers=",".join(config.kafka_servers), security_protocol=config.kafka_security_protocol, ssl_context=generate_ssl_context(config), @@ -603,10 +626,9 @@ async def construct( await consumer.start() try: yield cls( - dlq_topic=config.kafka_dlq_topic, - retry_topic=config.kafka_retry_topic, - consumer=consumer, + dlq_topic=dlq_topic, dlq_publisher=dlq_publisher, + consumer=consumer, process_dlq_event=process_dlq_event, ) finally: @@ -616,7 +638,6 @@ def __init__( self, *, dlq_topic: str, - retry_topic: str, dlq_publisher: EventPublisherProtocol, consumer: KafkaConsumerCompatible, process_dlq_event: DLQEventProcessor, @@ -630,10 +651,8 @@ def __init__( A running instance of a publishing provider that implements the EventPublisherProtocol, such as KafkaEventPublisher. - `dlq_topic`: - The name of the topic used to store failed events, to which the - KafkaDLQSubscriber subscribes. - - `retry_topic`: - The name of the topic used to requeue failed events. + The name of the DLQ topic to subscribe to. Has the format + "{original_topic}.{service_name}-dlq". - `process_dlq_event`: An async callable adhering to the DLQEventProcessor definition that provides validation and processing for events from the DLQ. It should return _either_ @@ -645,13 +664,18 @@ def __init__( self._consumer = consumer self._publisher = dlq_publisher self._dlq_topic = dlq_topic - self._retry_topic = retry_topic + + # In KafkaEventSubscriber, we get the service name from the config. However, + # the service name in effect here probably differs -- e.g., a DLQ-specific + # service might be running the KafkaDLQSubscriber. So we extract the service + # name from the DLQ topic name instead. + service_name = service_name_from_dlq_topic(dlq_topic) + self._retry_topic = service_name + "-retry" self._process_dlq_event = process_dlq_event async def _publish_to_retry(self, *, event: ExtractedEventInfo) -> None: """Publish the event to the retry topic.""" correlation_id = event.headers["correlation_id"] - original_topic = event.headers[ORIGINAL_TOPIC_FIELD] async with set_correlation_id(correlation_id): await self._publisher.publish( @@ -659,7 +683,7 @@ async def _publish_to_retry(self, *, event: ExtractedEventInfo) -> None: type_=event.type_, key=event.key, topic=self._retry_topic, - headers={ORIGINAL_TOPIC_FIELD: original_topic}, + headers={ORIGINAL_TOPIC_FIELD: self._dlq_topic.rsplit(".", 1)[0]}, ) logging.info( "Published an event with type '%s' to the retry topic '%s'", diff --git a/tests/unit/test_dlqsub.py b/tests/unit/test_dlqsub.py index 18a61484..c50612a5 100644 --- a/tests/unit/test_dlqsub.py +++ b/tests/unit/test_dlqsub.py @@ -18,6 +18,7 @@ from contextlib import nullcontext from copy import deepcopy from typing import Optional +from unittest.mock import AsyncMock import pytest from pydantic import BaseModel @@ -51,10 +52,15 @@ caplog_debug_fixture, # noqa: F401 ) +DEFAULT_SERVICE_NAME = "test_publisher" # see KafkaConfig instance in akafka.testutils +TEST_TOPIC = "test-topic" +TEST_TYPE = "test_type" +TEST_DLQ_TOPIC = "test-topic.test_publisher-dlq" +TEST_RETRY_TOPIC = "test_publisher-retry" TEST_EVENT = ExtractedEventInfo( payload={"key": "value"}, - type_="test_type", - topic="test-topic", + type_=TEST_TYPE, + topic=TEST_TOPIC, key="key", ) @@ -182,68 +188,35 @@ async def _publish_validated( def make_config( kafka_config: Optional[KafkaConfig] = None, *, - retry_topic: str = "retry", - dlq_topic: str = "dlq", max_retries: int = 0, enable_dlq: bool = True, retry_backoff: int = 0, ) -> KafkaConfig: """Convenience method to merge kafka fixture config with provided DLQ values.""" return KafkaConfig( - service_name=getattr(kafka_config, "service_name", "test"), + service_name=getattr(kafka_config, "service_name", DEFAULT_SERVICE_NAME), service_instance_id=getattr(kafka_config, "service_instance_id", "test"), kafka_servers=getattr(kafka_config, "kafka_servers", ["localhost:9092"]), - kafka_dlq_topic=dlq_topic, - kafka_retry_topic=retry_topic, kafka_max_retries=max_retries, kafka_enable_dlq=enable_dlq, kafka_retry_backoff=retry_backoff, ) -@pytest.mark.parametrize( - "retry_topic, dlq_topic, max_retries, enable_dlq, error", - [ - ("retry", "dlq", 0, True, False), - ("retry", "dlq", 1, True, False), - ("retry", "dlq", -1, True, True), - ("retry", "retry", 0, True, True), - ("retry", "retry", 0, False, True), - ("", "", 0, False, False), - ("", "dlq", 0, False, False), - ("retry", "dlq", 0, False, False), - ("retry", "", 0, True, True), - ("", "dlq", 0, True, True), - ("", "", 0, True, True), - ], -) -def test_config_validation( - retry_topic: str, - dlq_topic: str, - max_retries: int, - enable_dlq: bool, - error: bool, -): +@pytest.mark.parametrize("max_retries", [-1, 0, 1]) +def test_config_validation(max_retries: int): """Test for config validation. Errors should occur: 1. Anytime max_retries is < 0 - 2. If retry and DLQ topics are the same (non-empty) - 3. If the DLQ is enabled but the topics are not set (either or both) """ - with pytest.raises(ValueError) if error else nullcontext(): - make_config( - retry_topic=retry_topic, - dlq_topic=dlq_topic, - max_retries=max_retries, - enable_dlq=enable_dlq, - ) + with pytest.raises(ValueError) if max_retries < 0 else nullcontext(): + make_config(max_retries=max_retries) @pytest.mark.asyncio() async def test_original_topic_is_preserved(kafka: KafkaFixture): - """Ensure the original topic is preserved when it reaches the DLQ subscriber and - when it comes back to the subscriber. + """Ensure the original topic is preserved when it comes back to the subscriber. Consume a failing event, send to DLQ, consume from DLQ, send to Retry, consume from Retry, and check the original topic. @@ -253,11 +226,13 @@ async def test_original_topic_is_preserved(kafka: KafkaFixture): # Publish test event await kafka.publisher.publish(**vars(TEST_EVENT)) - # Create dummy translator and set it to auto-fail, then run the Retry subscriber + # Create dummy translator and set it to auto-fail translator = FailSwitchTranslator( - topics_of_interest=["test-topic"], types_of_interest=["test_type"], fail=True + topics_of_interest=[TEST_TOPIC], types_of_interest=[TEST_TYPE], fail=True ) assert not translator.successes + + # Run the subscriber and expect it to fail, sending the event to the DLQ async with KafkaEventSubscriber.construct( config=config, translator=translator, dlq_publisher=kafka.publisher ) as event_subscriber: @@ -266,9 +241,9 @@ async def test_original_topic_is_preserved(kafka: KafkaFixture): # Run the DLQ subscriber, telling it to publish the event to the retry topic async with KafkaDLQSubscriber.construct( - config=config, dlq_publisher=kafka.publisher - ) as dlq_sub: - await dlq_sub.run() + config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=kafka.publisher + ) as dlq_subscriber: + await dlq_subscriber.run() # Make sure the translator has nothing in the successes list, then run again assert not translator.successes @@ -286,17 +261,17 @@ async def test_invalid_retries_left(kafka: KafkaFixture, caplog_debug): """Ensure that the proper error is raised when retries_left is invalid.""" config = make_config(kafka.config, max_retries=2) translator = FailSwitchTranslator( - topics_of_interest=["test-topic"], types_of_interest=["test_type"] + topics_of_interest=[TEST_TOPIC], types_of_interest=[TEST_TYPE] ) dummy_publisher = DummyPublisher() async with KafkaEventSubscriber.construct( config=config, translator=translator, dlq_publisher=dummy_publisher - ) as retry_sub: + ) as event_subscriber: with pytest.raises(KafkaEventSubscriber.RetriesLeftError): - await retry_sub._retry_event(event=TEST_EVENT, retries_left=-1) + await event_subscriber._retry_event(event=TEST_EVENT, retries_left=-1) with pytest.raises(KafkaEventSubscriber.RetriesLeftError): - await retry_sub._retry_event(event=TEST_EVENT, retries_left=3) + await event_subscriber._retry_event(event=TEST_EVENT, retries_left=3) assert_logged( "ERROR", @@ -318,7 +293,7 @@ async def test_retries_exhausted( ): """Ensure the event is sent to the DLQ topic when the retries are exhausted if the DLQ is enabled. If the DLQ is disabled, then the underlying error should be - raised. + raised instead. """ config = make_config( kafka.config, max_retries=max_retries, enable_dlq=enable_dlq, retry_backoff=1 @@ -330,13 +305,13 @@ async def test_retries_exhausted( # Set up dummies and consume the event dummy_publisher = DummyPublisher() translator = FailSwitchTranslator( - topics_of_interest=["test-topic"], types_of_interest=["test_type"], fail=True + topics_of_interest=[TEST_TOPIC], types_of_interest=[TEST_TYPE], fail=True ) async with KafkaEventSubscriber.construct( config=config, translator=translator, dlq_publisher=dummy_publisher - ) as retry_sub: + ) as event_subscriber: with pytest.raises(RuntimeError) if not enable_dlq else nullcontext(): - await retry_sub.run(forever=False) + await event_subscriber.run(forever=False) # Verify that the event was retried "max_retries" times after initial failure (if any) assert translator.failures == [TEST_EVENT] * (max_retries + 1) @@ -369,11 +344,10 @@ async def test_retries_exhausted( # Put together the expected event with the original topic field appended failed_event = ExtractedEventInfo( type_=TEST_EVENT.type_, - topic=config.kafka_dlq_topic, + topic=TEST_DLQ_TOPIC, key=TEST_EVENT.key, payload=TEST_EVENT.payload, headers={ - ORIGINAL_TOPIC_FIELD: "test-topic", EXC_CLASS_FIELD: "RuntimeError", EXC_MSG_FIELD: "Destined to fail.", }, @@ -385,7 +359,9 @@ async def test_retries_exhausted( assert dummy_publisher.published == expected_published if enable_dlq: assert_logged( - "INFO", "Published event to DLQ topic 'dlq'", caplog_debug.records + "INFO", + f"Published event to DLQ topic '{TEST_DLQ_TOPIC}'", + caplog_debug.records, ) else: parsed_log = assert_logged( @@ -401,17 +377,16 @@ async def test_retries_exhausted( @pytest.mark.asyncio() async def test_send_to_retry(kafka: KafkaFixture, caplog_debug): """Ensure the event is sent to the retry topic when the DLQ subscriber is instructed - to do so. + to do so. This would occur in whatever service or app is resolving DLQ events. """ config = make_config(kafka.config) event_to_put_in_dlq = ExtractedEventInfo( payload=TEST_EVENT.payload, - type_="test_type", - topic=config.kafka_dlq_topic, + type_=TEST_TYPE, + topic=TEST_DLQ_TOPIC, key="123456", headers={ - ORIGINAL_TOPIC_FIELD: "test-topic", EXC_CLASS_FIELD: "RuntimeError", EXC_MSG_FIELD: "Destined to fail.", }, @@ -422,22 +397,23 @@ async def test_send_to_retry(kafka: KafkaFixture, caplog_debug): # Set up dummies and consume the event with the DLQ Subscriber dummy_publisher = DummyPublisher() async with KafkaDLQSubscriber.construct( - config=config, dlq_publisher=dummy_publisher - ) as dlq_sub: + config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher + ) as dlq_subscriber: assert not dummy_publisher.published - await dlq_sub.run(ignore=False) + await dlq_subscriber.run(ignore=False) assert_logged( "INFO", - "Published an event with type 'test_type' to the retry topic 'retry'", + f"Published an event with type 'test_type' to the retry topic '{TEST_RETRY_TOPIC}'", caplog_debug.records, ) # Verify that the event was sent to the RETRY topic - event_to_put_in_dlq.topic = config.kafka_retry_topic + event_to_put_in_dlq.topic = TEST_RETRY_TOPIC - # The exc_... headers are not supposed to be in the retry event - event_to_put_in_dlq.headers = {ORIGINAL_TOPIC_FIELD: "test-topic"} + # The exc_... headers are not supposed to be in the retry event, but the original + # topic should be! + event_to_put_in_dlq.headers = {ORIGINAL_TOPIC_FIELD: TEST_TOPIC} assert dummy_publisher.published == [event_to_put_in_dlq] @@ -450,24 +426,25 @@ async def test_consume_retry_without_og_topic(kafka: KafkaFixture, caplog_debug) event = ExtractedEventInfo( payload={"test_id": "123456"}, - type_="test_type", - topic=config.kafka_retry_topic, + type_=TEST_TYPE, + topic=TEST_RETRY_TOPIC, key="key", ) - # Publish that event directly to RETRY Topic, as if it had already been requeued + # Publish that event directly to RETRY Topic, as if it had already been requeued, # the original topic header is intentionally not included here await kafka.publisher.publish(**vars(event)) - # Set up dummies and consume the event with the DLQ Subscriber + # Set up dummies and subscriber translator = FailSwitchTranslator( - topics_of_interest=["test-topic"], types_of_interest=["test_type"] + topics_of_interest=[TEST_TOPIC], types_of_interest=[TEST_TYPE] ) async with KafkaEventSubscriber.construct( config=config, translator=translator, dlq_publisher=kafka.publisher ) as event_subscriber: assert not translator.failures or translator.successes + # Consume the event with the event subscriber await event_subscriber.run(forever=False) parsed_log = assert_logged( "INFO", @@ -475,7 +452,9 @@ async def test_consume_retry_without_og_topic(kafka: KafkaFixture, caplog_debug) caplog_debug.records, parse=False, ) - assert parsed_log.startswith("Ignored event of type 'test_type': retry") + assert parsed_log.startswith( + f"Ignored event of type 'test_type': {TEST_RETRY_TOPIC}" + ) assert parsed_log.endswith("errors: topic is empty") @@ -487,22 +466,22 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug): # make an event without the original_topic field in the header event = ExtractedEventInfo( payload={"test_id": "123456"}, - type_="test_type", - topic=config.kafka_dlq_topic, + type_=TEST_TYPE, + topic=TEST_DLQ_TOPIC, key="key", ) # Publish that event directly to DLQ Topic, as if it had already failed - # the original topic header is not included here + # the original topic header is not included at this point await kafka.publisher.publish(**vars(event)) # Set up dummies and consume the event with the DLQ Subscriber dummy_publisher = DummyPublisher() async with KafkaDLQSubscriber.construct( - config=config, dlq_publisher=dummy_publisher - ) as dlq_sub: + config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher + ) as dlq_subscriber: assert not dummy_publisher.published - await dlq_sub.run(ignore=True) + await dlq_subscriber.run(ignore=True) parsed_log = assert_logged( "INFO", @@ -510,7 +489,9 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug): caplog_debug.records, parse=False, ) - assert parsed_log.startswith("Ignoring event from DLQ topic 'dlq': dlq") + assert parsed_log.startswith( + f"Ignoring event from DLQ topic '{TEST_DLQ_TOPIC}': test" + ) # Assert that the event was not published to the retry topic assert not dummy_publisher.published @@ -527,14 +508,14 @@ async def test_no_retries_no_dlq_original_error(kafka: KafkaFixture, caplog_debu await kafka.publisher.publish(**vars(TEST_EVENT)) translator = FailSwitchTranslator( - topics_of_interest=["test-topic"], types_of_interest=["test_type"], fail=True + topics_of_interest=[TEST_TOPIC], types_of_interest=[TEST_TYPE], fail=True ) async with KafkaEventSubscriber.construct( config=config, translator=translator, dlq_publisher=kafka.publisher - ) as retry_sub: + ) as event_subscriber: assert not translator.successes with pytest.raises(RuntimeError, match="Destined to fail."): - await retry_sub.run(forever=False) + await event_subscriber.run(forever=False) assert not translator.successes assert translator.failures == [TEST_EVENT] @@ -576,22 +557,23 @@ async def test_outbox_with_dlq(kafka: KafkaFixture, event_type: str): # Run the outbox subscriber and expect it to fail async with KafkaOutboxSubscriber.construct( config=config, dlq_publisher=kafka.publisher, translators=[translator] - ) as outbox_sub: + ) as outbox_subscriber: assert not list_to_check - await outbox_sub.run(forever=False) + await outbox_subscriber.run(forever=False) assert list_to_check == [event] if event_type == "upserted" else [event.key] # Consume event from the DLQ topic, publish to retry topic + dlq_topic = f"users.{config.service_name}-dlq" async with KafkaDLQSubscriber.construct( - config=config, dlq_publisher=kafka.publisher - ) as dlq_sub: - await dlq_sub.run() + config=config, dlq_topic=dlq_topic, dlq_publisher=kafka.publisher + ) as dlq_subscriber: + await dlq_subscriber.run() # Retry the event after clearing the list list_to_check.clear() translator.fail = False assert not list_to_check - await outbox_sub.run(forever=False) + await outbox_subscriber.run(forever=False) assert list_to_check == [event] if event_type == "upserted" else [event.key] @@ -601,12 +583,11 @@ async def test_kafka_event_subcriber_construction(caplog): the DLQ is enabled but no provider is used. """ config = make_config() - translator = FailSwitchTranslator( - topics_of_interest=["test-topic"], types_of_interest=["test_type"] - ) with pytest.raises(ValueError): - async with KafkaEventSubscriber.construct(config=config, translator=translator): + async with KafkaEventSubscriber.construct( + config=config, translator=AsyncMock() + ): assert False assert_logged( @@ -632,9 +613,8 @@ async def test_default_dlq_processor( dlq_test_event = ExtractedEventInfo( payload=TEST_EVENT.payload, type_=TEST_EVENT.type_, - topic=config.kafka_dlq_topic, + topic=TEST_DLQ_TOPIC, key=TEST_EVENT.key, - headers={ORIGINAL_TOPIC_FIELD: "test-topic" if not validation_error else ""}, ) # Publish test event directly to DLQ with chosen correlation ID OR ignored @@ -644,11 +624,11 @@ async def test_default_dlq_processor( dummy_publisher = DummyPublisher() async with KafkaDLQSubscriber.construct( - config=config, dlq_publisher=dummy_publisher - ) as dlq_sub: + config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher + ) as dlq_subscriber: assert not dummy_publisher.published caplog.clear() - await dlq_sub.run() + await dlq_subscriber.run() assert dummy_publisher.published == [] if validation_error else [dlq_test_event] if validation_error: @@ -686,21 +666,21 @@ async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]: await kafka.publish_event( payload=TEST_EVENT.payload, type_=TEST_EVENT.type_, - topic=config.kafka_dlq_topic, + topic=TEST_DLQ_TOPIC, key=TEST_EVENT.key, - headers={ORIGINAL_TOPIC_FIELD: "test-topic"}, ) # Create custom processor instance and consume with the KafkaDLQSubscriber custom_processor = CustomDLQProcessor() async with KafkaDLQSubscriber.construct( config=config, + dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=DummyPublisher(), process_dlq_event=custom_processor.process, - ) as dlq_sub: + ) as dlq_subscriber: assert not custom_processor.hits with pytest.raises(DLQProcessingError) if processing_error else nullcontext(): - await dlq_sub.run() + await dlq_subscriber.run() # verify that the event was received processed by the custom processor assert len(custom_processor.hits) @@ -708,7 +688,6 @@ async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]: headers = headers_as_dict(event) assert headers["type"] == TEST_EVENT.type_ assert headers["correlation_id"] == correlation_id - assert headers[ORIGINAL_TOPIC_FIELD] == "test-topic" assert event.value == TEST_EVENT.payload - assert event.topic == config.kafka_dlq_topic + assert event.topic == TEST_DLQ_TOPIC assert event.key == TEST_EVENT.key