Skip to content

Commit

Permalink
feat: Record errors consumer SLO (#4178)
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara authored Jun 27, 2023
1 parent a77b2f9 commit 2be8944
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
2 changes: 1 addition & 1 deletion snuba/datasets/processors/errors_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def process_message(
if row is None: # the processor cannot/does not handle this input
return None

return InsertBatch([row], None)
return InsertBatch([row], row["received"])
elif type_ in REPLACEMENT_EVENT_TYPES:
# pass raw events along to republish
return ReplacementBatch(str(event["project_id"]), [message])
Expand Down
23 changes: 12 additions & 11 deletions tests/datasets/test_errors_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Mapping, Sequence
from unittest.mock import ANY
from uuid import UUID

import pytest
Expand Down Expand Up @@ -460,7 +461,7 @@ def test_errors_basic(self) -> None:
}
)
assert processor.process_message(payload, meta) == InsertBatch(
[message.build_result(meta)], None
[message.build_result(meta)], ANY
)

def test_errors_replayid_context(self) -> None:
Expand Down Expand Up @@ -498,7 +499,7 @@ def test_errors_replayid_context(self) -> None:
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)

assert self.processor.process_message(payload, meta) == InsertBatch(
[message.build_result(meta)], None
[message.build_result(meta)], ANY
)

def test_errors_replayid_tag(self) -> None:
Expand Down Expand Up @@ -542,7 +543,7 @@ def test_errors_replayid_tag(self) -> None:
result["tags.key"].insert(4, "replayId")
result["tags.value"].insert(4, replay_id.hex)
assert self.processor.process_message(payload, meta) == InsertBatch(
[result], None
[result], ANY
)

def test_errors_replayid_tag_and_context(self) -> None:
Expand Down Expand Up @@ -585,7 +586,7 @@ def test_errors_replayid_tag_and_context(self) -> None:
result = message.build_result(meta)
result["replay_id"] = str(replay_id)
assert self.processor.process_message(payload, meta) == InsertBatch(
[result], None
[result], ANY
)

def test_errors_replayid_invalid_tag(self) -> None:
Expand Down Expand Up @@ -629,7 +630,7 @@ def test_errors_replayid_invalid_tag(self) -> None:
result["tags.key"].insert(4, "replayId")
result["tags.value"].insert(4, invalid_replay_id)
assert self.processor.process_message(payload, meta) == InsertBatch(
[result], None
[result], ANY
)

def test_exception_main_thread_true(self) -> None:
Expand Down Expand Up @@ -683,7 +684,7 @@ def test_exception_main_thread_true(self) -> None:
result["exception_main_thread"] = True

assert self.processor.process_message(payload, meta) == InsertBatch(
[result], None
[result], ANY
)

def test_exception_main_thread_false(self) -> None:
Expand Down Expand Up @@ -737,7 +738,7 @@ def test_exception_main_thread_false(self) -> None:
result["exception_main_thread"] = False

assert self.processor.process_message(payload, meta) == InsertBatch(
[result], None
[result], ANY
)

def test_trace_sampled(self) -> None:
Expand Down Expand Up @@ -777,7 +778,7 @@ def test_trace_sampled(self) -> None:
result["trace_sampled"] = True

assert self.processor.process_message(payload, meta) == InsertBatch(
[result], None
[result], ANY
)

# verify processing trace.sampled=None works as it did before
Expand All @@ -788,7 +789,7 @@ def test_trace_sampled(self) -> None:
result2 = message.build_result(meta)

assert self.processor.process_message(payload, meta) == InsertBatch(
[result2], None
[result2], ANY
)

def test_errors_processed(self) -> None:
Expand Down Expand Up @@ -828,7 +829,7 @@ def test_errors_processed(self) -> None:
result["num_processing_errors"] = 3

assert self.processor.process_message(payload, meta) == InsertBatch(
[result], None
[result], ANY
)

# ensure old behavior where data.errors=None won't set 'num_processing_errors'
Expand All @@ -839,5 +840,5 @@ def test_errors_processed(self) -> None:
result = message.build_result(meta)

assert self.processor.process_message(payload, meta) == InsertBatch(
[result], None
[result], ANY
)

0 comments on commit 2be8944

Please sign in to comment.