Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consumer): Add transactions consumer SLO #4442

Merged
merged 10 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion snuba/datasets/processors/transactions_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
InsertBatch,
ProcessedMessage,
_as_dict_safe,
_collapse_uint32,
_ensure_valid_date,
_ensure_valid_ip,
_unicodify,
Expand Down Expand Up @@ -141,6 +142,7 @@ def _process_base_event_values(
metrics.increment("group_ids_exceeded_limit")

processed["group_ids"] = group_ids[:GROUP_IDS_LIMIT]

return processed

def _process_tags(
Expand Down Expand Up @@ -472,4 +474,10 @@ def process_message(
# the following operation modifies the event_dict and is therefore *not* order-independent
self._process_contexts_and_user(processed, event_dict)

return InsertBatch([processed], None)
try:
raw_received = _collapse_uint32(int(event_dict["data"]["received"]))
assert raw_received is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If received is not there, won't you get an AssertionError that isn't being caught?

Copy link
Member Author

@ayirr7 ayirr7 Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but wouldn't the first line (which throws KeyError) be thrown and caught first? So we go into the except clause?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I guess the AssertionError could be even easier, since it'd be guaranteed that it's being thrown at that line

received = datetime.utcfromtimestamp(raw_received)
return InsertBatch([processed], received)
except Exception:
Copy link

@hubertsentry hubertsentry Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would only check if it's KeyError, Exception can be wild range of exceptions

except KeyError:

Also, maybe looking into https://docs.python.org/3/tutorial/errors.html#exception-chaining

raise KeyError("Missing received timestamp field in transaction")
Copy link
Member

@lynnagara lynnagara Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the point of the try/except to fall back to the old behavior in case we hit the KeyError rather than re-raising it?

Copy link
Member Author

@ayirr7 ayirr7 Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear: today, what happens if there is a payload missing the received field is that the MessageProcessor will just drop it - is this correct?

So I guess we can fall back to the old behavior (so that we don't drop the message), but we also want to see the error show up in Sentry so we can know if there were any payloads missing the field. What's the right way of getting that error to show up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the point was to log to sentry without throwing the exception, i.e just manually call logger.exception

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright I can do that then, thanks

18 changes: 12 additions & 6 deletions tests/datasets/test_transaction_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Mapping, Optional, Sequence, Tuple
from unittest.mock import ANY

import pytest

Expand Down Expand Up @@ -45,6 +46,7 @@ class TransactionEvent:
has_app_ctx: bool = True
profile_id: Optional[str] = None
replay_id: Optional[str] = None
received: Optional[float] = None

def get_app_context(self) -> Optional[Mapping[str, str]]:
if self.has_app_ctx:
Expand Down Expand Up @@ -133,6 +135,7 @@ def serialize(self) -> Tuple[int, str, Mapping[str, Any]]:
"datetime": "2019-08-08T22:29:53.917000Z",
"timestamp": self.timestamp,
"start_timestamp": self.start_timestamp,
"received": self.received,
"measurements": {
"lcp": {"value": 32.129},
"lcp.elementSize": {"value": 4242},
Expand Down Expand Up @@ -305,6 +308,9 @@ def __get_transaction_event(self) -> TransactionEvent:
op="navigation",
timestamp=finish,
start_timestamp=start,
received=(
datetime.now(tz=timezone.utc) - timedelta(seconds=15)
).timestamp(),
platform="python",
dist="",
user_name="me",
Expand Down Expand Up @@ -364,7 +370,7 @@ def test_base_process(self) -> None:
)
assert TransactionsMessageProcessor().process_message(
message.serialize(), meta
) == InsertBatch([message.build_result(meta)], None)
) == InsertBatch([message.build_result(meta)], ANY)
settings.TRANSACT_SKIP_CONTEXT_STORE = old_skip_context

def test_too_many_spans(self) -> None:
Expand All @@ -389,7 +395,7 @@ def test_too_many_spans(self) -> None:

assert TransactionsMessageProcessor().process_message(
payload, meta
) == InsertBatch([result], None)
) == InsertBatch([result], ANY)
settings.TRANSACT_SKIP_CONTEXT_STORE = old_skip_context

def test_missing_transaction_source(self) -> None:
Expand Down Expand Up @@ -433,7 +439,7 @@ def test_app_ctx_none(self) -> None:
)
assert TransactionsMessageProcessor().process_message(
message.serialize(), meta
) == InsertBatch([message.build_result(meta)], None)
) == InsertBatch([message.build_result(meta)], ANY)
settings.TRANSACT_SKIP_CONTEXT_STORE = old_skip_context

def test_replay_id_as_tag(self) -> None:
Expand Down Expand Up @@ -463,7 +469,7 @@ def test_replay_id_as_tag(self) -> None:

assert TransactionsMessageProcessor().process_message(
payload, meta
) == InsertBatch([result], None)
) == InsertBatch([result], ANY)

def test_replay_id_as_tag_and_context(self) -> None:
"""
Expand Down Expand Up @@ -493,7 +499,7 @@ def test_replay_id_as_tag_and_context(self) -> None:

assert TransactionsMessageProcessor().process_message(
payload, meta
) == InsertBatch([result], None)
) == InsertBatch([result], ANY)

def test_replay_id_as_invalid_tag(self) -> None:
"""
Expand All @@ -520,4 +526,4 @@ def test_replay_id_as_invalid_tag(self) -> None:

assert TransactionsMessageProcessor().process_message(
payload, meta
) == InsertBatch([result], None)
) == InsertBatch([result], ANY)
Loading