Skip to content

Commit

Permalink
ref: Timestamp is not optional on commit log (#293)
Browse files Browse the repository at this point in the history
It was only this way for rollout, it's now there on all messages for years
  • Loading branch information
lynnagara authored Oct 5, 2023
1 parent dd0a0d8 commit 62d163c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 9 deletions.
11 changes: 3 additions & 8 deletions arroyo/backends/kafka/commit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
from datetime import datetime
from typing import Optional

from arroyo.backends.kafka import KafkaPayload
from arroyo.commit import Commit
Expand Down Expand Up @@ -41,13 +40,9 @@ def decode_legacy(self, value: KafkaPayload) -> Commit:
raise TypeError("payload value must be a bytes object")

headers = {k: v for (k, v) in value.headers}
try:
orig_message_ts: Optional[datetime] = datetime.strptime(
headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT
)
except KeyError:
orig_message_ts = None

orig_message_ts = datetime.strptime(
headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT
)
topic_name, partition_index, group = key.decode("utf-8").split(":", 3)
offset = int(val.decode("utf-8"))
return Commit(
Expand Down
2 changes: 1 addition & 1 deletion arroyo/commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ class Commit:
group: str
partition: Partition
offset: int
orig_message_ts: Optional[datetime]
orig_message_ts: datetime

0 comments on commit 62d163c

Please sign in to comment.