diff --git a/snuba/datasets/processors/transactions_processor.py b/snuba/datasets/processors/transactions_processor.py index 16b24e4f8b..1b3b363869 100644 --- a/snuba/datasets/processors/transactions_processor.py +++ b/snuba/datasets/processors/transactions_processor.py @@ -24,6 +24,7 @@ InsertBatch, ProcessedMessage, _as_dict_safe, + _collapse_uint32, _ensure_valid_date, _ensure_valid_ip, _unicodify, @@ -141,6 +142,7 @@ def _process_base_event_values( metrics.increment("group_ids_exceeded_limit") processed["group_ids"] = group_ids[:GROUP_IDS_LIMIT] + return processed def _process_tags( @@ -472,4 +474,11 @@ def process_message( # the following operation modifies the event_dict and is therefore *not* order-independent self._process_contexts_and_user(processed, event_dict) - return InsertBatch([processed], None) + try: + raw_received = _collapse_uint32(int(event_dict["data"]["received"])) + assert raw_received is not None + received = datetime.utcfromtimestamp(raw_received) + return InsertBatch([processed], received) + except (KeyError, AssertionError) as err: + logger.exception(err) + return InsertBatch([processed], None) diff --git a/tests/datasets/test_transaction_processor.py b/tests/datasets/test_transaction_processor.py index 7e2007200c..150e92284f 100644 --- a/tests/datasets/test_transaction_processor.py +++ b/tests/datasets/test_transaction_processor.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any, Mapping, Optional, Sequence, Tuple +from unittest.mock import ANY import pytest @@ -45,6 +46,7 @@ class TransactionEvent: has_app_ctx: bool = True profile_id: Optional[str] = None replay_id: Optional[str] = None + received: Optional[float] = None def get_app_context(self) -> Optional[Mapping[str, str]]: if self.has_app_ctx: @@ -133,6 +135,7 @@ def serialize(self) -> Tuple[int, str, Mapping[str, Any]]: "datetime": "2019-08-08T22:29:53.917000Z", "timestamp": self.timestamp, "start_timestamp": self.start_timestamp, + "received": self.received, "measurements": { "lcp": {"value": 32.129}, "lcp.elementSize": {"value": 4242}, @@ -305,6 +308,9 @@ def __get_transaction_event(self) -> TransactionEvent: op="navigation", timestamp=finish, start_timestamp=start, + received=( + datetime.now(tz=timezone.utc) - timedelta(seconds=15) + ).timestamp(), platform="python", dist="", user_name="me", @@ -364,7 +370,7 @@ def test_base_process(self) -> None: ) assert TransactionsMessageProcessor().process_message( message.serialize(), meta - ) == InsertBatch([message.build_result(meta)], None) + ) == InsertBatch([message.build_result(meta)], ANY) settings.TRANSACT_SKIP_CONTEXT_STORE = old_skip_context def test_too_many_spans(self) -> None: @@ -389,7 +395,7 @@ def test_too_many_spans(self) -> None: assert TransactionsMessageProcessor().process_message( payload, meta - ) == InsertBatch([result], None) + ) == InsertBatch([result], ANY) settings.TRANSACT_SKIP_CONTEXT_STORE = old_skip_context def test_missing_transaction_source(self) -> None: @@ -433,7 +439,7 @@ def test_app_ctx_none(self) -> None: ) assert TransactionsMessageProcessor().process_message( message.serialize(), meta - ) == InsertBatch([message.build_result(meta)], None) + ) == InsertBatch([message.build_result(meta)], ANY) settings.TRANSACT_SKIP_CONTEXT_STORE = old_skip_context def test_replay_id_as_tag(self) -> None: @@ -463,7 +469,7 @@ def test_replay_id_as_tag(self) -> None: assert TransactionsMessageProcessor().process_message( payload, meta - ) == InsertBatch([result], None) + ) == InsertBatch([result], ANY) def test_replay_id_as_tag_and_context(self) -> None: """ @@ -493,7 +499,7 @@ def test_replay_id_as_tag_and_context(self) -> None: assert TransactionsMessageProcessor().process_message( payload, meta - ) == InsertBatch([result], None) + ) == InsertBatch([result], ANY) def test_replay_id_as_invalid_tag(self) -> None: """ @@ -520,4 +526,4 @@ def test_replay_id_as_invalid_tag(self) -> None: assert TransactionsMessageProcessor().process_message( payload, meta - ) == InsertBatch([result], None) + ) == InsertBatch([result], ANY)