diff --git a/snuba/datasets/processors/errors_processor.py b/snuba/datasets/processors/errors_processor.py index c336908b56..e4137ff6cd 100644 --- a/snuba/datasets/processors/errors_processor.py +++ b/snuba/datasets/processors/errors_processor.py @@ -73,7 +73,7 @@ def process_message( if row is None: # the processor cannot/does not handle this input return None - return InsertBatch([row], None) + return InsertBatch([row], row["received"]) elif type_ in REPLACEMENT_EVENT_TYPES: # pass raw events along to republish return ReplacementBatch(str(event["project_id"]), [message]) diff --git a/tests/datasets/test_errors_processor.py b/tests/datasets/test_errors_processor.py index dec193674c..de87c60e0f 100644 --- a/tests/datasets/test_errors_processor.py +++ b/tests/datasets/test_errors_processor.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any, Mapping, Sequence +from unittest.mock import ANY from uuid import UUID import pytest @@ -460,7 +461,7 @@ def test_errors_basic(self) -> None: } ) assert processor.process_message(payload, meta) == InsertBatch( - [message.build_result(meta)], None + [message.build_result(meta)], ANY ) def test_errors_replayid_context(self) -> None: @@ -498,7 +499,7 @@ def test_errors_replayid_context(self) -> None: meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp) assert self.processor.process_message(payload, meta) == InsertBatch( - [message.build_result(meta)], None + [message.build_result(meta)], ANY ) def test_errors_replayid_tag(self) -> None: @@ -542,7 +543,7 @@ def test_errors_replayid_tag(self) -> None: result["tags.key"].insert(4, "replayId") result["tags.value"].insert(4, replay_id.hex) assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_errors_replayid_tag_and_context(self) -> None: @@ -585,7 +586,7 @@ def test_errors_replayid_tag_and_context(self) -> None: result = message.build_result(meta) result["replay_id"] = str(replay_id) assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_errors_replayid_invalid_tag(self) -> None: @@ -629,7 +630,7 @@ def test_errors_replayid_invalid_tag(self) -> None: result["tags.key"].insert(4, "replayId") result["tags.value"].insert(4, invalid_replay_id) assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_exception_main_thread_true(self) -> None: @@ -683,7 +684,7 @@ def test_exception_main_thread_true(self) -> None: result["exception_main_thread"] = True assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_exception_main_thread_false(self) -> None: @@ -737,7 +738,7 @@ def test_exception_main_thread_false(self) -> None: result["exception_main_thread"] = False assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) def test_trace_sampled(self) -> None: @@ -777,7 +778,7 @@ def test_trace_sampled(self) -> None: result["trace_sampled"] = True assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) # verify processing trace.sampled=None works as it did before @@ -788,7 +789,7 @@ def test_trace_sampled(self) -> None: result2 = message.build_result(meta) assert self.processor.process_message(payload, meta) == InsertBatch( - [result2], None + [result2], ANY ) def test_errors_processed(self) -> None: @@ -828,7 +829,7 @@ def test_errors_processed(self) -> None: result["num_processing_errors"] = 3 assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY ) # ensure old behavior where data.errors=None won't set 'num_processing_errors' @@ -839,5 +840,5 @@ def test_errors_processed(self) -> None: result = message.build_result(meta) assert self.processor.process_message(payload, meta) == InsertBatch( - [result], None + [result], ANY )