Skip to content

Commit

Permalink
feat(dlq-test): Add temporary code to fail message processing (#4408)
Browse files Browse the repository at this point in the history
This is temporary code that should be reverted. It adds the ability
to reject messages from the querylog processor and DLQ them based
on a reject rate which can be set via runtime config.

We will try replaying them using the new DLQ replay mechanism.
  • Loading branch information
lynnagara authored Jun 23, 2023
1 parent ce84708 commit ffa2fcb
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion snuba/datasets/processors/querylog_processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import random
import uuid
from typing import Any, Mapping, MutableMapping, Optional, Sequence, Union

Expand All @@ -11,7 +12,7 @@
QueryMetadata,
)

from snuba import environment
from snuba import environment, state
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.processor import InsertBatch, ProcessedMessage
Expand Down Expand Up @@ -150,6 +151,12 @@ def _remove_invalid_data(self, processed: dict[str, Any]) -> None:
def process_message(
self, message: Querylog, metadata: KafkaMessageMetadata
) -> Optional[ProcessedMessage]:
# XXX: Temporary code for the DLQ test.
reject_rate = state.get_config("querylog_reject_rate", 0.0)
assert isinstance(reject_rate, float)
if random.random() < reject_rate:
raise ValueError("This message is rejected on purpose.")

processed = {
"request_id": str(uuid.UUID(message["request"]["id"])),
"request_body": self.__to_json_string(message["request"]["body"]),
Expand Down

0 comments on commit ffa2fcb

Please sign in to comment.