From b4dd5a443d643770b33bc258744e94e7833bcfe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leonhard=20Riedi=C3=9Fer?= Date: Fri, 16 Feb 2024 10:46:56 +0100 Subject: [PATCH] Add chunk-based consumer API (#1281) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add first draft of a chunk-based consumer API * Add missing methods to mirror the API surface from `KafkaConsume` This also introduces a `CommitNow` object instead of `Unit` * Provide a single `consumeChunk` method instead of mirroring the full consumer API * Add test for `consumeChunk` * Add Scaladoc for `consumeChunk` * Describe `consumeChunk` in the docs * Add Stream extension method for `consumeChunk` * Use `consumeChunk` in the quick example in the docs * Tweaks to the docs * Tweak the docs * Rename private method * Use `mapAccumulate` instead of `foldLeft` * Run prePR to fix headers * Fix compile error * Fix compilation issues with different Scala versions * Update docs/src/main/mdoc/quick-example.md Co-authored-by: Alan Artigao Carreño * Update docs/src/main/mdoc/consumers.md Co-authored-by: Alan Artigao Carreño * Update docs/src/main/mdoc/quick-example.md Co-authored-by: Alan Artigao Carreño * Use `onlyOrError` instead of `>> F.never` --------- Co-authored-by: Alan Artigao Carreño --- docs/src/main/mdoc/consumers.md | 42 ++++++++ docs/src/main/mdoc/quick-example.md | 45 +++------ .../main/scala/fs2/kafka/KafkaConsumer.scala | 10 ++ .../kafka/consumer/KafkaConsumeChunk.scala | 96 +++++++++++++++++++ .../scala/fs2/kafka/KafkaConsumerSpec.scala | 38 ++++++++ 5 files changed, 201 insertions(+), 30 deletions(-) create mode 100644 modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsumeChunk.scala diff --git a/docs/src/main/mdoc/consumers.md b/docs/src/main/mdoc/consumers.md index de4879fc9..56d6187ca 100644 --- a/docs/src/main/mdoc/consumers.md +++ b/docs/src/main/mdoc/consumers.md @@ -12,7 +12,9 @@ import scala.concurrent.duration._ import cats.effect._ import cats.syntax.all._ +import fs2._ import fs2.kafka._ +import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow ``` ## Deserializers @@ -272,6 +274,46 @@ object ConsumerMapAsyncExample extends IOApp.Simple { Offsets commits are managed manually, which is important for ensuring at-least-once delivery. This means that, by [default](#default-settings), automatic offset commits are disabled. If you're sure you don't need at-least-once delivery, you can re-enable automatic offset commits using `withEnableAutoCommit` on [`ConsumerSettings`][consumersettings], and then ignore the [`CommittableOffset`][committableoffset] part of [`CommittableConsumerRecord`][committableconsumerrecord], keeping only the [`ConsumerRecord`][consumerrecord]. +### Working on `Chunk` + +Use cases that require at-least-once delivery make it necessary to commit the offset of messages only after the message has been successfully processed. Implementing this correctly can be challenging, especially when the business logic requires advanced data manipulation with concurrency, batching, filtering and the like: + +- When consuming multiple messages from the same partition concurrently, a consumer might lose messages if the commits happen out of order and a message that is not the last one on its partition can't be processed and has to be retried. +- When filtering messages, it's important to still commit the offset of the filtered message because if this message is the latest one on its partition, it will get re-sent infinitely. +- For performance reasons, it makes sense to batch the offsets when committing them. + +The recommended pattern for these use cases is by working on the `Chunk`s of records that are part of the `Stream`. The library supports that with the `consumeChunk` method: + +```scala mdoc:silent +object ConsumerChunkExample extends IOApp.Simple { + val run: IO[Unit] = { + def processRecords(records: Chunk[ConsumerRecord[String, String]]): IO[CommitNow] = + records.traverse(record => IO.println(s"Processing record: $record")).as(CommitNow) + + KafkaConsumer.stream(consumerSettings) + .subscribeTo("topic") + .consumeChunk(processRecords) + } +} +``` + +Note that this method uses `partitionedStream`, which means that all the partitions assigned to the consumer will be processed concurrently. + +As a user, you don't have to care about the offset commits, all you have to do is implement a function that processes all records in the `Chunk`, and return a `IO[CommitNow]`. After this action finished, the offsets for all messages in the `Chunk` will be committed. `CommitNow` is basically the same as `Unit`, but helps in making it clear when the processing of messages has been finished and it's time to commit. + +This brings several benefits: + +- **Correctness:** You can focus on implementing your business logic, without having to worry about offset commits or propagating the correct offsets through your code. Offsets are committed correctly afterwards. +- **Performance:** Typical performance improvements are bulk-writes to a database, or using concurrency to speed things up. These patterns can be used liberally when working on the records in a `Chunk`, without having to sacrifice correctness. +- **Flexibility:** Besides using batching and concurrency, you might want to filter out messages, or process them in a different order than they appear on the partitions. As long as you work on a single `Chunk` and make sure that the processing is finished when you return `CommitNow`, you can do all that. +- A concrete example that makes use of these ideas is to group all the messages in the `Chunk` by key and then only process the last message for each key (basically doing what Kafka's log compaction does). In many occasions, it's also possible to process the messages for different keys concurrently, which drastically increases the available concurrency. + +If the chunk size doesn't fit your needs, the first way to start tuning is the `max.poll.records` config property of your consumer. + +### Committing manually + +If `consumeChunk` doesn't work for you, you can always commit your offsets manually. + Offset commits are usually done in batches for performance reasons. We normally don't need to commit every offset, but only the last processed offset. There is a trade-off in how much reprocessing we have to do when we restart versus the performance implication of committing more frequently. Depending on our situation, we'll then choose an appropriate frequency for offset commits. We should keep the [`CommittableOffset`][committableoffset] in our `Stream` once we've finished processing the record. For at-least-once delivery, it's essential that offset commits preserve topic-partition ordering, so we have to make sure we keep offsets in the same order as we receive them. There is one convenience function for the most common batch committing scenario, `commitBatchWithin`. diff --git a/docs/src/main/mdoc/quick-example.md b/docs/src/main/mdoc/quick-example.md index 326696038..fc381aaba 100644 --- a/docs/src/main/mdoc/quick-example.md +++ b/docs/src/main/mdoc/quick-example.md @@ -10,17 +10,13 @@ Following is an example showing how to: - use `commitBatchWithin` to commit consumed offsets in batches. ```scala mdoc -import scala.concurrent.duration._ - import cats.effect.{IO, IOApp} +import fs2._ import fs2.kafka._ +import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow object Main extends IOApp.Simple { - val run: IO[Unit] = { - def processRecord(record: ConsumerRecord[String, String]): IO[(String, String)] = - IO.pure(record.key -> record.value) - val consumerSettings = ConsumerSettings[IO, String, String] .withAutoOffsetReset(AutoOffsetReset.Earliest) @@ -28,34 +24,23 @@ object Main extends IOApp.Simple { .withGroupId("group") val producerSettings = - ProducerSettings[IO, String, String].withBootstrapServers("localhost:9092") + ProducerSettings[IO, String, String] + .withBootstrapServers("localhost:9092") + + def processRecords(producer: KafkaProducer[IO, String, String])(records: Chunk[ConsumerRecord[String, String]]): IO[CommitNow] = { + val producerRecords = records.map(consumerRecord => ProducerRecord("topic", consumerRecord.key, consumerRecord.value)) + producer.produce(producerRecords).flatten.as(CommitNow) + } val stream = - KafkaConsumer - .stream(consumerSettings) - .subscribeTo("topic") - .records - .mapAsync(25) { committable => - processRecord(committable.record).map { case (key, value) => - val record = ProducerRecord("topic", key, value) - committable.offset -> ProducerRecords.one(record) - } - } - .through { offsetsAndProducerRecords => - KafkaProducer - .stream(producerSettings) - .flatMap { producer => - offsetsAndProducerRecords - .evalMap { case (offset, producerRecord) => - producer.produce(producerRecord).map(_.as(offset)) - } - .parEvalMap(Int.MaxValue)(identity) - } - } - .through(commitBatchWithin(500, 15.seconds)) + KafkaProducer.stream(producerSettings).evalMap { producer => + KafkaConsumer + .stream(consumerSettings) + .subscribeTo("topic") + .consumeChunk(chunk => processRecords(producer)(chunk)) + } stream.compile.drain } - } ``` diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index c2d5928c4..313d618c3 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -21,6 +21,7 @@ import cats.effect.std.* import cats.syntax.all.* import fs2.{Chunk, Stream} import fs2.kafka.consumer.* +import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow import fs2.kafka.instances.* import fs2.kafka.internal.* import fs2.kafka.internal.converters.collection.* @@ -63,6 +64,7 @@ import org.apache.kafka.common.{Metric, MetricName, PartitionInfo, TopicPartitio */ sealed abstract class KafkaConsumer[F[_], K, V] extends KafkaConsume[F, K, V] + with KafkaConsumeChunk[F, K, V] with KafkaAssignment[F] with KafkaOffsetsV2[F] with KafkaSubscription[F] @@ -817,6 +819,14 @@ object KafkaConsumer { def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] = self.flatMap(_.partitionedRecords) + /** + * Consume from all assigned partitions concurrently, processing the messages in `Chunk`s. See + * [[KafkaConsumeChunk#consumeChunk]] + */ + def consumeChunk(processor: Chunk[ConsumerRecord[K, V]] => F[CommitNow])(implicit + F: Concurrent[F] + ): F[Nothing] = self.evalMap(_.consumeChunk(processor)).compile.onlyOrError + } /* diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsumeChunk.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsumeChunk.scala new file mode 100644 index 000000000..9990efc0e --- /dev/null +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsumeChunk.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2018-2024 OVO Energy Limited + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package fs2.kafka.consumer + +import cats.effect.Concurrent +import cats.syntax.flatMap.* +import cats.Monad +import fs2.* +import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow +import fs2.kafka.CommittableConsumerRecord +import fs2.kafka.CommittableOffsetBatch +import fs2.kafka.ConsumerRecord + +trait KafkaConsumeChunk[F[_], K, V] extends KafkaConsume[F, K, V] { + + /** + * Consume from all assigned partitions concurrently, processing the records in `Chunk`s. For + * each `Chunk`, the provided `processor` is called, after that has finished the offsets for all + * messages in the chunk are committed.

+ * + * This method is intended to be used in cases that require at-least-once-delivery, where + * messages have to be processed before offsets are committed. By relying on the methods like + * [[partitionedStream]], [[records]], and similar, you have to correctly implement not only your + * processing logic but also the correct mechanism for committing offsets. This can be tricky to + * do in a correct and efficient way.

+ * + * Working with `Chunk`s of records has several benefits:
+ * - As a user, you don't have to care about committing offsets correctly. You can focus on + * implementing your business logic
+ * - It's very straightforward to batch several messages from a `Chunk` together, e.g. for + * efficient writes to a persistent storage
+ * - You can liberally use logic that involves concurrency, filtering, and re-ordering of + * messages without having to worry about incorrect offset commits
+ * + *
+ * + * The `processor` is a function that takes a `Chunk[ConsumerRecord[K, V]]` and returns a + * `F[CommitNow]`. [[CommitNow]] is isomorphic to `Unit`, but helps in transporting the intention + * that processing of a `Chunk` is done, offsets should be committed, and no important processing + * should be done afterwards.

+ * + * The returned value has the type `F[Nothing]`, because it's a never-ending process that doesn't + * terminate, and therefore doesn't return a result. + * + * @note + * This method does not make any use of Kafka's auto-commit feature, it implements "manual" + * commits in a way that suits most of the common use cases. + * @note + * you have to first use `subscribe` or `assign` the consumer before using this `Stream`. If + * you forgot to subscribe, there will be a [[NotSubscribedException]] raised in the `Stream`. + * @see + * [[partitionedStream]] + * @see + * [[CommitNow]] + */ + final def consumeChunk( + processor: Chunk[ConsumerRecord[K, V]] => F[CommitNow] + )(implicit F: Concurrent[F]): F[Nothing] = partitionedStream + .map( + _.chunks.evalMap(consume(processor)) + ) + .parJoinUnbounded + .drain + .compile + .onlyOrError + + private def consume(processor: Chunk[ConsumerRecord[K, V]] => F[CommitNow])( + chunk: Chunk[CommittableConsumerRecord[F, K, V]] + )(implicit F: Monad[F]): F[Unit] = { + val (offsets, records) = chunk + .mapAccumulate(CommittableOffsetBatch.empty)((offsetBatch, committableRecord) => + (offsetBatch.updated(committableRecord.offset), committableRecord.record) + ) + + processor(records) >> offsets.commit + } + +} + +object KafkaConsumeChunk { + + type CommitNow = CommitNow.type + + /** + * Token to indicate that a `Chunk` has been processed and the corresponding offsets are ready to + * be committed.
+ * + * Isomorphic to `Unit`, but more intention revealing. + */ + object CommitNow + +} diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala index f40286b9a..74b9355c7 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala @@ -16,6 +16,7 @@ import cats.effect.unsafe.implicits.global import cats.effect.Ref import cats.syntax.all.* import fs2.concurrent.SignallingRef +import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow import fs2.kafka.internal.converters.collection.* import fs2.Stream @@ -1174,6 +1175,43 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { } } + describe("KafkaConsumer#consumeChunk") { + it("should process the messages and commit the offsets") { + withTopic { topic => + val produced = (0 until 5).map(n => s"key-$n" -> s"value-$n") + publishToKafka(topic, produced) + + val consumed = for { + ref <- Ref.of[IO, Vector[(String, String)]](Vector.empty) + _ <- KafkaConsumer + .stream(consumerSettings[IO]) + .evalTap(_.assign(topic)) + .evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream + .evalMap( + _.consumeChunk(chunk => + chunk + .traverse(record => ref.getAndUpdate(_ :+ (record.key -> record.value))) + .as(CommitNow) + ) + ).interruptAfter(10.seconds).compile.drain + res <- ref.get + } yield res + + val res = consumed.unsafeRunSync() + + (res should contain).theSameElementsInOrderAs(produced) + + val topicPartition = new TopicPartition(topic, 0) + + val actuallyCommitted = withKafkaConsumer(defaultConsumerProperties) { consumer => + consumer.committed(Set(topicPartition).asJava).asScala.toMap + }.map { case (k, v) => k -> v.offset() }.toMap + + actuallyCommitted shouldBe Map(topicPartition -> 5L) + } + } + } + private def commitTest( commit: (KafkaConsumer[IO, String, String], CommittableOffsetBatch[IO]) => IO[Unit] ): Assertion =