Skip to content

Commit

Permalink
Define constants for test dlq and retry topics
Browse files Browse the repository at this point in the history
  • Loading branch information
TheByronHimes committed Nov 21, 2024
1 parent e598444 commit 8bb8d1d
Showing 1 changed file with 26 additions and 28 deletions.
54 changes: 26 additions & 28 deletions tests/unit/test_dlqsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@
caplog_debug_fixture, # noqa: F401
)

DEFAULT_SERVICE_NAME = "test_publisher" # see KafkaConfig instance in akafka.testutils
TEST_TOPIC = "test-topic"
TEST_TYPE = "test_type"
TEST_DLQ_TOPIC = "test-topic.test_publisher-dlq"
TEST_RETRY_TOPIC = "test_publisher-retry"
TEST_EVENT = ExtractedEventInfo(
payload={"key": "value"},
type_=TEST_TYPE,
Expand Down Expand Up @@ -191,7 +194,7 @@ def make_config(
) -> KafkaConfig:
"""Convenience method to merge kafka fixture config with provided DLQ values."""
return KafkaConfig(
service_name=getattr(kafka_config, "service_name", "test"),
service_name=getattr(kafka_config, "service_name", DEFAULT_SERVICE_NAME),
service_instance_id=getattr(kafka_config, "service_instance_id", "test"),
kafka_servers=getattr(kafka_config, "kafka_servers", ["localhost:9092"]),
kafka_max_retries=max_retries,
Expand Down Expand Up @@ -238,10 +241,8 @@ async def test_original_topic_is_preserved(kafka: KafkaFixture):
await event_subscriber.run(forever=False)

# Run the DLQ subscriber, telling it to publish the event to the retry topic
service_name = config.service_name
dlq_topic = f"test-topic.{service_name}-dlq"
async with KafkaDLQSubscriber.construct(
config=config, dlq_topic=dlq_topic, dlq_publisher=kafka.publisher
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=kafka.publisher
) as dlq_subscriber:
await dlq_subscriber.run()

Expand Down Expand Up @@ -342,10 +343,9 @@ async def test_retries_exhausted(
assert_not_logged("WARNING", retry_log, caplog_debug.records)

# Put together the expected event with the original topic field appended
dlq_topic = f"test-topic.{config.service_name}-dlq"
failed_event = ExtractedEventInfo(
type_=TEST_EVENT.type_,
topic=dlq_topic,
topic=TEST_DLQ_TOPIC,
key=TEST_EVENT.key,
payload=TEST_EVENT.payload,
headers={
Expand All @@ -360,7 +360,9 @@ async def test_retries_exhausted(
assert dummy_publisher.published == expected_published
if enable_dlq:
assert_logged(
"INFO", f"Published event to DLQ topic '{dlq_topic}'", caplog_debug.records
"INFO",
f"Published event to DLQ topic '{TEST_DLQ_TOPIC}'",
caplog_debug.records,
)
else:
parsed_log = assert_logged(
Expand All @@ -380,11 +382,10 @@ async def test_send_to_retry(kafka: KafkaFixture, caplog_debug):
"""
config = make_config(kafka.config)

dlq_topic = f"test-topic.{config.service_name}-dlq"
event_to_put_in_dlq = ExtractedEventInfo(
payload=TEST_EVENT.payload,
type_=TEST_TYPE,
topic=dlq_topic,
topic=TEST_DLQ_TOPIC,
key="123456",
headers={
EXC_CLASS_FIELD: "RuntimeError",
Expand All @@ -397,20 +398,19 @@ async def test_send_to_retry(kafka: KafkaFixture, caplog_debug):
# Set up dummies and consume the event with the DLQ Subscriber
dummy_publisher = DummyPublisher()
async with KafkaDLQSubscriber.construct(
config=config, dlq_topic=dlq_topic, dlq_publisher=dummy_publisher
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher
) as dlq_subscriber:
assert not dummy_publisher.published
await dlq_subscriber.run(ignore=False)

retry_topic = config.service_name + "-retry"
assert_logged(
"INFO",
f"Published an event with type 'test_type' to the retry topic '{retry_topic}'",
f"Published an event with type 'test_type' to the retry topic '{TEST_RETRY_TOPIC}'",
caplog_debug.records,
)

# Verify that the event was sent to the RETRY topic
event_to_put_in_dlq.topic = retry_topic
event_to_put_in_dlq.topic = TEST_RETRY_TOPIC

# The exc_... headers are not supposed to be in the retry event, but the original
# topic should be!
Expand All @@ -425,11 +425,10 @@ async def test_consume_retry_without_og_topic(kafka: KafkaFixture, caplog_debug)
"""
config = make_config(kafka.config)

retry_topic = config.service_name + "-retry"
event = ExtractedEventInfo(
payload={"test_id": "123456"},
type_=TEST_TYPE,
topic=retry_topic,
topic=TEST_RETRY_TOPIC,
key="key",
)

Expand All @@ -455,7 +454,7 @@ async def test_consume_retry_without_og_topic(kafka: KafkaFixture, caplog_debug)
parse=False,
)
assert parsed_log.startswith(
f"Ignored event of type 'test_type': {retry_topic}"
f"Ignored event of type 'test_type': {TEST_RETRY_TOPIC}"
)
assert parsed_log.endswith("errors: topic is empty")

Expand All @@ -465,12 +464,11 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug):
"""Test what happens when a DLQ Subscriber is instructed to ignore an event."""
config = make_config(kafka.config)

dlq_topic = f"test-topic.{config.service_name}-dlq"
# make an event without the original_topic field in the header
event = ExtractedEventInfo(
payload={"test_id": "123456"},
type_=TEST_TYPE,
topic=dlq_topic,
topic=TEST_DLQ_TOPIC,
key="key",
)

Expand All @@ -481,7 +479,7 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug):
# Set up dummies and consume the event with the DLQ Subscriber
dummy_publisher = DummyPublisher()
async with KafkaDLQSubscriber.construct(
config=config, dlq_topic=dlq_topic, dlq_publisher=dummy_publisher
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher
) as dlq_subscriber:
assert not dummy_publisher.published
await dlq_subscriber.run(ignore=True)
Expand All @@ -492,7 +490,9 @@ async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug):
caplog_debug.records,
parse=False,
)
assert parsed_log.startswith(f"Ignoring event from DLQ topic '{dlq_topic}': test")
assert parsed_log.startswith(
f"Ignoring event from DLQ topic '{TEST_DLQ_TOPIC}': test"
)

# Assert that the event was not published to the retry topic
assert not dummy_publisher.published
Expand Down Expand Up @@ -564,7 +564,7 @@ async def test_outbox_with_dlq(kafka: KafkaFixture, event_type: str):
assert list_to_check == [event] if event_type == "upserted" else [event.key]

# Consume event from the DLQ topic, publish to retry topic
dlq_topic = f"{translator.event_topic}.{config.service_name}-dlq"
dlq_topic = f"users.{config.service_name}-dlq"
async with KafkaDLQSubscriber.construct(
config=config, dlq_topic=dlq_topic, dlq_publisher=kafka.publisher
) as dlq_subscriber:
Expand Down Expand Up @@ -611,11 +611,10 @@ async def test_default_dlq_processor(
"""
config = make_config(kafka.config)

dlq_topic = f"test-topic.{config.service_name}-dlq"
dlq_test_event = ExtractedEventInfo(
payload=TEST_EVENT.payload,
type_=TEST_EVENT.type_,
topic=dlq_topic,
topic=TEST_DLQ_TOPIC,
key=TEST_EVENT.key,
)

Expand All @@ -626,7 +625,7 @@ async def test_default_dlq_processor(

dummy_publisher = DummyPublisher()
async with KafkaDLQSubscriber.construct(
config=config, dlq_topic=dlq_topic, dlq_publisher=dummy_publisher
config=config, dlq_topic=TEST_DLQ_TOPIC, dlq_publisher=dummy_publisher
) as dlq_subscriber:
assert not dummy_publisher.published
caplog.clear()
Expand Down Expand Up @@ -663,21 +662,20 @@ async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]:
config = make_config(kafka.config)

# Publish test event directly to DLQ with chosen correlation ID
dlq_topic = f"test-topic.{config.service_name}-dlq"
correlation_id = new_correlation_id()
async with set_correlation_id(correlation_id):
await kafka.publish_event(
payload=TEST_EVENT.payload,
type_=TEST_EVENT.type_,
topic=dlq_topic,
topic=TEST_DLQ_TOPIC,
key=TEST_EVENT.key,
)

# Create custom processor instance and consume with the KafkaDLQSubscriber
custom_processor = CustomDLQProcessor()
async with KafkaDLQSubscriber.construct(
config=config,
dlq_topic=dlq_topic,
dlq_topic=TEST_DLQ_TOPIC,
dlq_publisher=DummyPublisher(),
process_dlq_event=custom_processor.process,
) as dlq_subscriber:
Expand All @@ -692,5 +690,5 @@ async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]:
assert headers["type"] == TEST_EVENT.type_
assert headers["correlation_id"] == correlation_id
assert event.value == TEST_EVENT.payload
assert event.topic == dlq_topic
assert event.topic == TEST_DLQ_TOPIC
assert event.key == TEST_EVENT.key

0 comments on commit 8bb8d1d

Please sign in to comment.