diff --git a/tests/unit/test_dlqsub.py b/tests/unit/test_dlqsub.py index fe8c006c..524501b3 100644 --- a/tests/unit/test_dlqsub.py +++ b/tests/unit/test_dlqsub.py @@ -52,8 +52,11 @@ caplog_debug_fixture, # noqa: F401 ) +DEFAULT_SERVICE_NAME = "test_publisher" # see KafkaConfig instance in akafka.testutils TEST_TOPIC = "test-topic" TEST_TYPE = "test_type" +TEST_DLQ_TOPIC = "test-topic.test_publisher-dlq" +TEST_RETRY_TOPIC = "test_publisher-retry" TEST_EVENT = ExtractedEventInfo( payload={"key": "value"}, type_=TEST_TYPE, @@ -191,7 +194,7 @@ def make_config( ) -> KafkaConfig: """Convenience method to merge kafka fixture config with provided DLQ values.""" return KafkaConfig( - service_name=getattr(kafka_config, "service_name", "test"), + service_name=getattr(kafka_config, "service_name", DEFAULT_SERVICE_NAME), service_instance_id=getattr(kafka_config, "service_instance_id", "test"), kafka_servers=getattr(kafka_config, "kafka_servers", ["localhost:9092"]), kafka_max_retries=max_retries, @@ -238,10 +241,8 @@ async def test_original_topic_is_preserved(kafka: KafkaFixture): await event_subscriber.run(forever=False) # Run the DLQ subscriber, telling it to publish the event to the retry topic - service_name = config.service_name - dlq_topic = f"test-topic.{service_name}-dlq" async with KafkaDLQSubscriber.construct( - config=config, dlq_topic=dlq_topic, dlq_publisher=kafka.publisher + config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=kafka.publisher ) as dlq_subscriber: await dlq_subscriber.run() @@ -342,10 +343,9 @@ async def test_retries_exhausted( assert_not_logged("WARNING", retry_log, caplog_debug.records) # Put together the expected event with the original topic field appended - dlq_topic = f"test-topic.{config.service_name}-dlq" failed_event = ExtractedEventInfo( type_=TEST_EVENT.type_, - topic=dlq_topic, + topic=TEST_DLQ_TOPIC, key=TEST_EVENT.key, payload=TEST_EVENT.payload, headers={ @@ -360,7 +360,9 @@ async def test_retries_exhausted( assert dummy_publisher.published == expected_published if enable_dlq: assert_logged( - "INFO", f"Published event to DLQ topic '{dlq_topic}'", caplog_debug.records + "INFO", + f"Published event to DLQ topic '{TEST_DLQ_TOPIC}'", + caplog_debug.records, ) else: parsed_log = assert_logged( @@ -380,11 +382,10 @@ async def test_send_to_retry(kafka: KafkaFixture, caplog_debug): """ config = make_config(kafka.config) - dlq_topic = f"test-topic.{config.service_name}-dlq" event_to_put_in_dlq = ExtractedEventInfo( payload=TEST_EVENT.payload, type_=TEST_TYPE, - topic=dlq_topic, + topic=TEST_DLQ_TOPIC, key="123456", headers={ EXC_CLASS_FIELD: "RuntimeError", @@ -397,20 +398,19 @@ async def test_send_to_retry(kafka: KafkaFixture, caplog_debug): # Set up dummies and consume the event with the DLQ Subscriber dummy_publisher = DummyPublisher() async with KafkaDLQSubscriber.construct( - config=config, dlq_topic=dlq_topic, dlq_publisher=dummy_publisher + config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher ) as dlq_subscriber: assert not dummy_publisher.published await dlq_subscriber.run(ignore=False) - retry_topic = config.service_name + "-retry" assert_logged( "INFO", - f"Published an event with type 'test_type' to the retry topic '{retry_topic}'", + f"Published an event with type 'test_type' to the retry topic '{TEST_RETRY_TOPIC}'", caplog_debug.records, ) # Verify that the event was sent to the RETRY topic - event_to_put_in_dlq.topic = retry_topic + event_to_put_in_dlq.topic = TEST_RETRY_TOPIC # The exc_... headers are not supposed to be in the retry event, but the original # topic should be! @@ -425,11 +425,10 @@ async def test_consume_retry_without_og_topic(kafka: KafkaFixture, caplog_debug) """ config = make_config(kafka.config) - retry_topic = config.service_name + "-retry" event = ExtractedEventInfo( payload={"test_id": "123456"}, type_=TEST_TYPE, - topic=retry_topic, + topic=TEST_RETRY_TOPIC, key="key", ) @@ -455,7 +454,7 @@ async def test_consume_retry_without_og_topic(kafka: KafkaFixture, caplog_debug) parse=False, ) assert parsed_log.startswith( - f"Ignored event of type 'test_type': {retry_topic}" + f"Ignored event of type 'test_type': {TEST_RETRY_TOPIC}" ) assert parsed_log.endswith("errors: topic is empty") @@ -465,12 +464,11 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug): """Test what happens when a DLQ Subscriber is instructed to ignore an event.""" config = make_config(kafka.config) - dlq_topic = f"test-topic.{config.service_name}-dlq" # make an event without the original_topic field in the header event = ExtractedEventInfo( payload={"test_id": "123456"}, type_=TEST_TYPE, - topic=dlq_topic, + topic=TEST_DLQ_TOPIC, key="key", ) @@ -481,7 +479,7 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug): # Set up dummies and consume the event with the DLQ Subscriber dummy_publisher = DummyPublisher() async with KafkaDLQSubscriber.construct( - config=config, dlq_topic=dlq_topic, dlq_publisher=dummy_publisher + config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher ) as dlq_subscriber: assert not dummy_publisher.published await dlq_subscriber.run(ignore=True) @@ -492,7 +490,9 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug): caplog_debug.records, parse=False, ) - assert parsed_log.startswith(f"Ignoring event from DLQ topic '{dlq_topic}': test") + assert parsed_log.startswith( + f"Ignoring event from DLQ topic '{TEST_DLQ_TOPIC}': test" + ) # Assert that the event was not published to the retry topic assert not dummy_publisher.published @@ -564,7 +564,7 @@ async def test_outbox_with_dlq(kafka: KafkaFixture, event_type: str): assert list_to_check == [event] if event_type == "upserted" else [event.key] # Consume event from the DLQ topic, publish to retry topic - dlq_topic = f"{translator.event_topic}.{config.service_name}-dlq" + dlq_topic = f"users.{config.service_name}-dlq" async with KafkaDLQSubscriber.construct( config=config, dlq_topic=dlq_topic, dlq_publisher=kafka.publisher ) as dlq_subscriber: @@ -611,11 +611,10 @@ async def test_default_dlq_processor( """ config = make_config(kafka.config) - dlq_topic = f"test-topic.{config.service_name}-dlq" dlq_test_event = ExtractedEventInfo( payload=TEST_EVENT.payload, type_=TEST_EVENT.type_, - topic=dlq_topic, + topic=TEST_DLQ_TOPIC, key=TEST_EVENT.key, ) @@ -626,7 +625,7 @@ async def test_default_dlq_processor( dummy_publisher = DummyPublisher() async with KafkaDLQSubscriber.construct( - config=config, dlq_topic=dlq_topic, dlq_publisher=dummy_publisher + config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher ) as dlq_subscriber: assert not dummy_publisher.published caplog.clear() @@ -663,13 +662,12 @@ async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]: config = make_config(kafka.config) # Publish test event directly to DLQ with chosen correlation ID - dlq_topic = f"test-topic.{config.service_name}-dlq" correlation_id = new_correlation_id() async with set_correlation_id(correlation_id): await kafka.publish_event( payload=TEST_EVENT.payload, type_=TEST_EVENT.type_, - topic=dlq_topic, + topic=TEST_DLQ_TOPIC, key=TEST_EVENT.key, ) @@ -677,7 +675,7 @@ async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]: custom_processor = CustomDLQProcessor() async with KafkaDLQSubscriber.construct( config=config, - dlq_topic=dlq_topic, + dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=DummyPublisher(), process_dlq_event=custom_processor.process, ) as dlq_subscriber: @@ -692,5 +690,5 @@ async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]: assert headers["type"] == TEST_EVENT.type_ assert headers["correlation_id"] == correlation_id assert event.value == TEST_EVENT.payload - assert event.topic == dlq_topic + assert event.topic == TEST_DLQ_TOPIC assert event.key == TEST_EVENT.key