Skip to content

Commit

Permalink
adjust CommitCodec to handle potentially text header values
Browse files Browse the repository at this point in the history
  • Loading branch information
asottile-sentry authored Aug 1, 2024
1 parent f6851b9 commit 37de058
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions arroyo/backends/kafka/commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,13 @@ def decode_legacy(self, value: KafkaPayload) -> Commit:
if not isinstance(val, bytes):
raise TypeError("payload value must be a bytes object")

headers = {k: v for (k, v) in value.headers}
orig_message_ts = datetime.strptime(
headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT
)
for k, v in value.headers:
if k == "orig_message_ts":
v_s = v.decode() if isinstance(v, bytes) else v
orig_message_ts = datetime.strptime(v_s, DATETIME_FORMAT)
break
else:
raise ValueError('missing `orig_message_ts` header')

topic_name, partition_index, group = key.decode("utf-8").split(":", 3)
offset = int(val.decode("utf-8"))
Expand Down

0 comments on commit 37de058

Please sign in to comment.