-
-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Fix #149 - Ensure Compatibility with aiokafka ConsumerRecord Headers #150
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## main #150 +/- ##
==========================================
- Coverage 98.12% 98.10% -0.03%
==========================================
Files 35 35
Lines 1497 1528 +31
==========================================
+ Hits 1469 1499 +30
- Misses 28 29 +1 ☔ View full report in Codecov by Sentry. |
if not isinstance(key_str, bytes): | ||
key = key_str.encode() if key_str is not None else None | ||
else: | ||
key = key_str | ||
|
||
if not isinstance(value_str, bytes): | ||
value = value_str.encode() if value_str is not None else None | ||
else: | ||
value = value_str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implies that the typing of key_str
and value_str
are incorrect -- perhaps they need to be str | bytes | None
. I'd also suggest checking the typing of Message
since I likely copied these values from there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the Message
type is not particularly relevant here; what matters is converting it to a ConsumerRecord
, as you have done previously. Kafka does not return strings—it retains data as bytes.
If you examine the ConsumerRecord
, you'll see that the types of the key and value are specifically defined by aiokafka, not as strings but as bytes or other types, depending on the configuration.
For reference:
Key and Value Types: aiokafka.structs.VT
ConsumerRecord Structure: aiokafka.structs.ConsumerRecord
Given this, I think the function should consistently return bytes to align with Kafka's handling of data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we want to always return bytes. I don't really understand why ConsumerRecord
is generic rather than always offering bytes
, though if as you suggest that's a setting then perhaps we should follow that? Either way I don't think we want to change that as part of this fix -- sticking with only bytes feels fine to me until we have a good reason to change.
My concern was the annotations of key_str
and value_str
on lines 36 & 37, which are effectively determined by the internal Message
type's field types. That type currently implies that it can only contain strings (or None
), which is why the previous code doesn't handle bytes
. If it can legitimately be given btytes
then the signature there should change and the annotations here updated similarly.
def _translate_header_to_internal_format(self, headers: Sequence[Tuple[str, bytes]]) -> dict: | ||
header_dict: dict[str, bytes] = defaultdict(bytes) | ||
if not headers: | ||
return header_dict | ||
|
||
for item in headers: | ||
header_dict[item[0]] = item[1] | ||
return header_dict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this again holistically I'm actually now wondering what I'm missing here. I think this could be replaced with a simple call to dict(headers)
:
>>> dict([ ('a', b'B'), ('c', b'D') ])
{'a': b'B', 'c': b'D'}
yet this code appears to be somewhat more involved. Is the use of a defaultdict
important? (If so that can also be initialised over a sequence of values too) Is there something I'm else missing about the desired semantics?
Possibly related, it seems that at least one of the callsites can end up passing None
as a value here. However that's not represented in the type annotation. Are we intending to allow None
? Do we need to?
Given that we accept a Sequence[...]
here, using an empty tuple as the default value (rather than None
) would simplify things. (Using a tuple as a function parameter's default is safe since tuples are immutable.)
It would avoid the if not headers
check and allow the trivial dict(...)
conversion to be used unconditionally. This is probably not more expensive computationally but is much simpler in terms of code complexity.
async def _produce(self, topic, value=None, *args, **kwargs) -> None: | ||
# create a message and call produce kafka | ||
kwargs['headers'] = self._translate_header_to_internal_format(kwargs['headers']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As headers
is now a required key I'd suggest making it an explicitly named argument (still keyword only), even if the result is that it's assigned into kwargs
for passing onwards. This allows us to both clarify to callers about its nature and specify a default if needed.
|
||
@pytest.mark.asyncio | ||
@asetup_kafka(topics=[{"topic": "test_topic1", "partition": 2}], clean=True) | ||
async def test_produce_and_consume_with_headers(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test might be clearer with some added vertical whitespace to separate out the sections.
Perhaps like this:
producer = FakeAIOKafkaProducer()
consumer = FakeAIOKafkaConsumer()
await producer.start()
await consumer.start()
consumer.subscribe({"test_topic1"})
await producer.send(
topic="test_topic1",
headers=[('header_name', b"test"), ('header_name2', b"test")],
key=b"test"
)
await producer.stop()
record = await consumer.getone()
assert record.headers == (('header_name', b"test"), ('header_name2', b"test"))
await consumer.stop()
This PR addresses issue #149 by enhancing the header handling logic to ensure compatibility with aiokafka's ConsumerRecord.
Specifically, the headers are now processed as a 'Sequence[Tuple[str, bytes]]', which aligns with aiokafka's expected format. This change prevents potential errors when dealing with Kafka message headers in the application.