From c80f3097c0fd89fca84a205af36fdad39ad6a510 Mon Sep 17 00:00:00 2001 From: Lyn Date: Tue, 29 Aug 2023 09:04:49 -0700 Subject: [PATCH] fix: Ensure carried over message is in buffer Since `_run_once` can early return (https://github.com/getsentry/arroyo/blob/6286c7921978065edeb5ce72d829031f952d2e5e/arroyo/processing/processor.py#L369) it was possible that a message was never placed in `self.buffered_messages`. If we try to retreive it later, it can crash the consumer. This is suspected to be the cause of the `Invalid message not found in buffer` messages we saw in prod. --- arroyo/processing/processor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 0fe77a1c..38cf4d69 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -377,8 +377,7 @@ def _run_once(self) -> None: message = ( Message(self.__message) if self.__message is not None else None ) - if not message_carried_over: - self.__buffered_messages.append(self.__message) + self.__buffered_messages.append(self.__message) self.__processing_strategy.submit(message) self.__metrics_buffer.incr_timing(