diff --git a/snuba/consumers/dlq.py b/snuba/consumers/dlq.py index 476130d39e..8b1409050b 100644 --- a/snuba/consumers/dlq.py +++ b/snuba/consumers/dlq.py @@ -8,6 +8,7 @@ from typing import Optional, TypeVar import rapidjson +from arroyo.dlq import InvalidMessage from arroyo.processing.strategies.abstract import ProcessingStrategy from arroyo.types import Message @@ -150,7 +151,12 @@ def poll(self) -> None: def submit(self, message: Message[TPayload]) -> None: if self.__processed_messages < self.__num_messages_to_process: self.__last_message_time = time.time() - self.__next_step.submit(message) + + try: + self.__next_step.submit(message) + except InvalidMessage: + self.__processed_messages += 1 + raise self.__processed_messages += 1 def close(self) -> None: