You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently the only native Kafka serializers and deserializers we use are the ones for Array[Byte] - we then use our own serializer and deserializer types to convert between Array[Byte] and user types. I think we should continue to use our own more strongly-typed ValueDeserializer etc types, but have them either extend the native Kafka types or have a method to produce instances of the Kafka types to use when instantiating the Kafka Consumer and Producer.
Performance
This should both increase speed and reduce memory usage / garbage creation. At present, when deserializing Avro (for example), each message is first read from the TCP socket into an Array[Byte] allocated by the native ByteArray deserializer; the fs2-kafka Avro deserializer then reads from the allocated Array[Byte]. If instead we used a native Kafka Avro deserializer directly, it could instead read directly from the socket (Avro deserializers natively operate on InputStream), avoiding allocation of an intermediate array. This would particularly save a lot of copying in the (not uncommon) case where an Avro record has many fields but we're only interested in a couple (the ones we're not interested in can be skipped entirely when reading from the socket), and when we're not interested in keys (by deserializing them as null we avoid copying anything).
I'd want to have some benchmarks to validate any performance improvements of course.
Design simplicity
It's conceptually confusing having our own Serializer and Deserializer classes that commonly wrap native Kafka ones but are completely separate from them type-wise, and then work in combination with other native serdes (the Array[Byte] ones). I guess the motivation is for side effects to be suspended in a polymorphic F - I have a couple of thoughts on this:
Purity here has relatively small benefits, and side-effecting serdes are probably fine. The side effects we expect relate to the mechanics of serializing/deserializing according to a schema, and should never have any effect on application logic (for them to do so would almost certanly be very, very incorrect.) We can still provide typeclass instances for pure functional manipulation of the serdes - only the serialization/deserializtion operation itself would be impure.
If people really need serdes to interop with pure code we can add fromSyncIO and fromAsync methods that use SyncIO#unsafeRunSync and Dispatcher.
Other implications
This would mean largely reverting #902 - KafkaConsumerActor would once again be typed according to the message type (but all deserialization would take place inside the native consumer). KafkaProducerConnection would no longer make sense, but we could add contramap to KafkaProducer.
Any thoughts on this?
The text was updated successfully, but these errors were encountered:
@vlovgr@LMnet what do you think? #902 hasn't been released in a non-milestone yet, so if we're going to go this way I'd like to revert it before releasing 2.5.0 so we don't change partitionedStream behaviour twice.
I think we should continue to use our own more strongly-typed ValueDeserializer etc types, but have them either extend the native Kafka types or have a method to produce instances of the Kafka types to use when instantiating the Kafka Consumer and Producer.
Either way sounds good to me. I know some users rely on parallelised decoding, which you should still be able to do.
Currently the only native Kafka serializers and deserializers we use are the ones for
Array[Byte]
- we then use our own serializer and deserializer types to convert betweenArray[Byte]
and user types. I think we should continue to use our own more strongly-typedValueDeserializer
etc types, but have them either extend the native Kafka types or have a method to produce instances of the Kafka types to use when instantiating the Kafka Consumer and Producer.Performance
This should both increase speed and reduce memory usage / garbage creation. At present, when deserializing Avro (for example), each message is first read from the TCP socket into an
Array[Byte]
allocated by the native ByteArray deserializer; the fs2-kafka Avro deserializer then reads from the allocatedArray[Byte]
. If instead we used a native Kafka Avro deserializer directly, it could instead read directly from the socket (Avro deserializers natively operate onInputStream
), avoiding allocation of an intermediate array. This would particularly save a lot of copying in the (not uncommon) case where an Avro record has many fields but we're only interested in a couple (the ones we're not interested in can be skipped entirely when reading from the socket), and when we're not interested in keys (by deserializing them asnull
we avoid copying anything).I'd want to have some benchmarks to validate any performance improvements of course.
Design simplicity
It's conceptually confusing having our own Serializer and Deserializer classes that commonly wrap native Kafka ones but are completely separate from them type-wise, and then work in combination with other native serdes (the
Array[Byte]
ones). I guess the motivation is for side effects to be suspended in a polymorphicF
- I have a couple of thoughts on this:fromSyncIO
andfromAsync
methods that useSyncIO#unsafeRunSync
andDispatcher
.Other implications
This would mean largely reverting #902 -
KafkaConsumerActor
would once again be typed according to the message type (but all deserialization would take place inside the native consumer).KafkaProducerConnection
would no longer make sense, but we could addcontramap
toKafkaProducer
.Any thoughts on this?
The text was updated successfully, but these errors were encountered: