Skip to content

Commit

Permalink
feat: Add optional received_p99 timestamp to commit log
Browse files Browse the repository at this point in the history
The value from the received field can be used in the future for
subscription scheduling if this is provided. This is better than
the `orig_message_ts` field as `received` is assigned at the very
start of the pipeline when Sentry receives the event (as opposed to
when Snuba gets the event). Switching to this field means any
delays in ingestion will be properly accounted for when determining
the window on which to schedule subscriptions.

This PR also:
- deprecates the legacy decoder since we have fully switched over to the new format
- switches orig_message_ts from datetime to float. Converting between
the two in encode/decode is pointless, and it introduces the possibility of
timezone issues. Simpler to just keep it a unix timestamp everywhere.
  • Loading branch information
lynnagara committed Oct 15, 2023
1 parent 2da7f70 commit 0ddc8f2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 44 deletions.
41 changes: 9 additions & 32 deletions arroyo/backends/kafka/commit.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import json
from datetime import datetime

from arroyo.backends.kafka import KafkaPayload
from arroyo.commit import Commit
from arroyo.types import Partition, Topic
from arroyo.utils.codecs import Codec

# Kept in decode method for backward compatibility. Will be
# remove in a future release of Arroyo
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"


class CommitCodec(Codec[KafkaPayload, Commit]):
def encode(self, value: Commit) -> KafkaPayload:
Expand All @@ -18,7 +13,8 @@ def encode(self, value: Commit) -> KafkaPayload:
payload = json.dumps(
{
"offset": value.offset,
"orig_message_ts": datetime.timestamp(value.orig_message_ts),
"orig_message_ts": value.orig_message_ts,
"received_p99": value.received_p99,
}
).encode("utf-8")

Expand All @@ -30,28 +26,6 @@ def encode(self, value: Commit) -> KafkaPayload:
[],
)

def decode_legacy(self, value: KafkaPayload) -> Commit:
key = value.key
if not isinstance(key, bytes):
raise TypeError("payload key must be a bytes object")

val = value.value
if not isinstance(val, bytes):
raise TypeError("payload value must be a bytes object")

headers = {k: v for (k, v) in value.headers}
orig_message_ts = datetime.strptime(
headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT
)
topic_name, partition_index, group = key.decode("utf-8").split(":", 3)
offset = int(val.decode("utf-8"))
return Commit(
group,
Partition(Topic(topic_name), int(partition_index)),
offset,
orig_message_ts,
)

def decode(self, value: KafkaPayload) -> Commit:
key = value.key
if not isinstance(key, bytes):
Expand All @@ -63,12 +37,14 @@ def decode(self, value: KafkaPayload) -> Commit:

payload = val.decode("utf-8")

if payload.isnumeric():
return self.decode_legacy(value)

decoded = json.loads(payload)
offset = decoded["offset"]
orig_message_ts = datetime.fromtimestamp(decoded["orig_message_ts"])
orig_message_ts = decoded["orig_message_ts"]

if decoded.get("received_p99"):
received_ts = decoded["received_p99"]
else:
received_ts = None

topic_name, partition_index, group = key.decode("utf-8").split(":", 3)

Expand All @@ -77,4 +53,5 @@ def decode(self, value: KafkaPayload) -> Commit:
Partition(Topic(topic_name), int(partition_index)),
offset,
orig_message_ts,
received_ts,
)
6 changes: 3 additions & 3 deletions arroyo/commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Mapping, MutableMapping, Optional

from arroyo.types import Partition
Expand Down Expand Up @@ -61,9 +60,10 @@ def did_commit(self, now: float, offsets: Mapping[Partition, int]) -> None:

@dataclass(frozen=True)
class Commit:
__slots__ = ["group", "partition", "offset", "orig_message_ts"]
__slots__ = ["group", "partition", "offset", "orig_message_ts", "received_p99"]

group: str
partition: Partition
offset: int
orig_message_ts: datetime
orig_message_ts: float
received_p99: Optional[float]
14 changes: 5 additions & 9 deletions tests/backends/test_commit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime
from datetime import datetime, timedelta

from arroyo.backends.kafka.commit import CommitCodec
from arroyo.backends.kafka import KafkaPayload
from arroyo.commit import Commit
from arroyo.types import Partition, Topic

Expand All @@ -12,19 +11,16 @@ def test_encode_decode() -> None:

offset_to_commit = 5

now = datetime.now()

commit = Commit(
"leader-a",
Partition(topic, 0),
offset_to_commit,
datetime.now(),
now,
now - timedelta(seconds=5),
)

encoded = commit_codec.encode(commit)

assert commit_codec.decode(encoded) == commit

def test_decode_legacy() -> None:
legacy = KafkaPayload(b"topic:0:leader-a", b"5", [('orig_message_ts', b'2023-09-26T21:58:14.191325Z')])
decoded = CommitCodec().decode(legacy)
assert decoded.offset == 5
assert decoded.group == "leader-a"

0 comments on commit 0ddc8f2

Please sign in to comment.