Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consumer): Add transactions consumer SLO #4442

Merged
merged 10 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion snuba/datasets/processors/transactions_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
InsertBatch,
ProcessedMessage,
_as_dict_safe,
_collapse_uint32,
_ensure_valid_date,
_ensure_valid_ip,
_unicodify,
Expand Down Expand Up @@ -141,6 +142,7 @@ def _process_base_event_values(
metrics.increment("group_ids_exceeded_limit")

processed["group_ids"] = group_ids[:GROUP_IDS_LIMIT]

return processed

def _process_tags(
Expand Down Expand Up @@ -472,4 +474,11 @@ def process_message(
# the following operation modifies the event_dict and is therefore *not* order-independent
self._process_contexts_and_user(processed, event_dict)

return InsertBatch([processed], None)
try:
raw_received = _collapse_uint32(int(event_dict["data"]["received"]))
assert raw_received is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If received is not there, won't you get an AssertionError that isn't being caught?

Copy link
Member Author

@ayirr7 ayirr7 Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but wouldn't the first line (which throws KeyError) be thrown and caught first? So we go into the except clause?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I guess the AssertionError could be even easier, since it'd be guaranteed that it's being thrown at that line

received = datetime.utcfromtimestamp(raw_received)
return InsertBatch([processed], received)
except KeyError as err:
logger.exception("Missing required field received in payload", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.exception("Missing required field received in payload", err)
logger.exception(e)

return InsertBatch([processed], None)
18 changes: 12 additions & 6 deletions tests/datasets/test_transaction_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Mapping, Optional, Sequence, Tuple
from unittest.mock import ANY

import pytest

Expand Down Expand Up @@ -45,6 +46,7 @@ class TransactionEvent:
has_app_ctx: bool = True
profile_id: Optional[str] = None
replay_id: Optional[str] = None
received: Optional[float] = None

def get_app_context(self) -> Optional[Mapping[str, str]]:
if self.has_app_ctx:
Expand Down Expand Up @@ -133,6 +135,7 @@ def serialize(self) -> Tuple[int, str, Mapping[str, Any]]:
"datetime": "2019-08-08T22:29:53.917000Z",
"timestamp": self.timestamp,
"start_timestamp": self.start_timestamp,
"received": self.received,
"measurements": {
"lcp": {"value": 32.129},
"lcp.elementSize": {"value": 4242},
Expand Down Expand Up @@ -305,6 +308,9 @@ def __get_transaction_event(self) -> TransactionEvent:
op="navigation",
timestamp=finish,
start_timestamp=start,
received=(
datetime.now(tz=timezone.utc) - timedelta(seconds=15)
).timestamp(),
platform="python",
dist="",
user_name="me",
Expand Down Expand Up @@ -364,7 +370,7 @@ def test_base_process(self) -> None:
)
assert TransactionsMessageProcessor().process_message(
message.serialize(), meta
) == InsertBatch([message.build_result(meta)], None)
) == InsertBatch([message.build_result(meta)], ANY)
settings.TRANSACT_SKIP_CONTEXT_STORE = old_skip_context

def test_too_many_spans(self) -> None:
Expand All @@ -389,7 +395,7 @@ def test_too_many_spans(self) -> None:

assert TransactionsMessageProcessor().process_message(
payload, meta
) == InsertBatch([result], None)
) == InsertBatch([result], ANY)
settings.TRANSACT_SKIP_CONTEXT_STORE = old_skip_context

def test_missing_transaction_source(self) -> None:
Expand Down Expand Up @@ -433,7 +439,7 @@ def test_app_ctx_none(self) -> None:
)
assert TransactionsMessageProcessor().process_message(
message.serialize(), meta
) == InsertBatch([message.build_result(meta)], None)
) == InsertBatch([message.build_result(meta)], ANY)
settings.TRANSACT_SKIP_CONTEXT_STORE = old_skip_context

def test_replay_id_as_tag(self) -> None:
Expand Down Expand Up @@ -463,7 +469,7 @@ def test_replay_id_as_tag(self) -> None:

assert TransactionsMessageProcessor().process_message(
payload, meta
) == InsertBatch([result], None)
) == InsertBatch([result], ANY)

def test_replay_id_as_tag_and_context(self) -> None:
"""
Expand Down Expand Up @@ -493,7 +499,7 @@ def test_replay_id_as_tag_and_context(self) -> None:

assert TransactionsMessageProcessor().process_message(
payload, meta
) == InsertBatch([result], None)
) == InsertBatch([result], ANY)

def test_replay_id_as_invalid_tag(self) -> None:
"""
Expand All @@ -520,4 +526,4 @@ def test_replay_id_as_invalid_tag(self) -> None:

assert TransactionsMessageProcessor().process_message(
payload, meta
) == InsertBatch([result], None)
) == InsertBatch([result], ANY)
Loading