diff --git a/docs/src/main/mdoc/consumers.md b/docs/src/main/mdoc/consumers.md
index 2e04dcb9b..ff3859282 100644
--- a/docs/src/main/mdoc/consumers.md
+++ b/docs/src/main/mdoc/consumers.md
@@ -547,6 +547,49 @@ You may notice, that actual graceful shutdown implementation requires a decent a
Also note, that even if you implement a graceful shutdown your application may fall with an error. And in this case, a graceful shutdown will not be invoked. It means that your application should be ready to an _at least once_ semantic even when a graceful shutdown is implemented. Or, if you need an _exactly once_ semantic, consider using [transactions](transactions.md).
+### Graceful partition revoke
+
+In addition to graceful shutdown of hole consumer there is an option to configure your consumer to wait for the streams
+to finish processing partition before "releasing" it. Behavior can be enabled via the following settings:
+```scala mdoc:silent
+object WithGracefulPartitionRevoke extends IOApp.Simple {
+
+ val run: IO[Unit] = {
+ def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] =
+ IO(println(s"Processing record: $record"))
+
+ def run(consumer: KafkaConsumer[IO, String, String]): IO[Unit] = {
+ consumer.subscribeTo("topic") >> consumer
+ .stream
+ .evalMap { msg =>
+ processRecord(msg).as(msg.offset)
+ }
+ .through(commitBatchWithin(100, 15.seconds))
+ .compile
+ .drain
+ }
+
+ val consumerSettings = ConsumerSettings[IO, String, String] =
+ ConsumerSettings[IO, String, String]
+ .withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)
+ .withSessionTimeout(2.seconds)
+
+ KafkaConsumer
+ .resource(consumerSettings)
+ .use { consumer =>
+ run(consumer)
+ }
+ }
+
+}
+```
+
+Please note that this setting does not guarantee that all the commits will be performed before partition is revoked and
+that `session.timeout.ms` setting is set to lower value. Be aware that awaiting too long for partition processor
+to finish will cause processing of the whole topic to be suspended.
+
+Awaiting for commits to complete might be implemented in the future.
+
[commitrecovery-default]: @API_BASE_URL@/CommitRecovery$.html#Default:fs2.kafka.CommitRecovery
[committableconsumerrecord]: @API_BASE_URL@/CommittableConsumerRecord.html
[committableoffset]: @API_BASE_URL@/CommittableOffset.html
diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
index 5d3290797..92f847d18 100644
--- a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
+++ b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
@@ -8,14 +8,14 @@ package fs2.kafka
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext
-
import cats.effect.Resource
import cats.Show
import fs2.kafka.security.KafkaCredentialStore
-
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.requests.OffsetFetchResponse
+import scala.util.Try
+
/**
* [[ConsumerSettings]] contain settings necessary to create a [[KafkaConsumer]]. At the very
* least, this includes key and value deserializers.
@@ -151,6 +151,18 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V]
+
+ /**
+ * Returns value for property:
+ *
+ * {{{
+ * ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG
+ * }}}
+ *
+ * Returns a value as a [[FiniteDuration]] for convenience
+ */
+ def sessionTimeout: FiniteDuration
+
/**
* Returns a new [[ConsumerSettings]] instance with the specified session timeout. This is
* equivalent to setting the following property using the [[withProperty]] function, except you
@@ -373,6 +385,17 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V]
+ /**
+ * One of two possible modes of operation for [[KafkaConsumer.partitionsMapStream]]. See [[RebalanceRevokeMode]]
+ * for detailed explanation of differences between them.
+ */
+ def rebalanceRevokeMode: RebalanceRevokeMode
+
+ /**
+ * Creates a new [[ConsumerSettings]] with the specified [[rebalanceRevokeMode]].
+ */
+ def withRebalanceRevokeMode(rebalanceRevokeMode: RebalanceRevokeMode): ConsumerSettings[F, K, V]
+
}
object ConsumerSettings {
@@ -388,7 +411,8 @@ object ConsumerSettings {
override val pollTimeout: FiniteDuration,
override val commitRecovery: CommitRecovery,
override val recordMetadata: ConsumerRecord[K, V] => String,
- override val maxPrefetchBatches: Int
+ override val maxPrefetchBatches: Int,
+ override val rebalanceRevokeMode: RebalanceRevokeMode
) extends ConsumerSettings[F, K, V] {
override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] =
@@ -422,6 +446,13 @@ object ConsumerSettings {
override def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)
+ //need to use Try, to avoid separate implementation for scala 2.12
+ override def sessionTimeout: FiniteDuration =
+ properties.get(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG)
+ .flatMap(str => Try(str.toLong).toOption)
+ .map(_.millis)
+ .getOrElse(45000.millis)
+
override def withSessionTimeout(sessionTimeout: FiniteDuration): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toMillis.toString)
@@ -509,6 +540,11 @@ object ConsumerSettings {
): ConsumerSettings[F, K, V] =
withProperties(credentialsStore.properties)
+ override def withRebalanceRevokeMode(
+ rebalanceRevokeMode: RebalanceRevokeMode
+ ): ConsumerSettings[F, K, V] =
+ copy(rebalanceRevokeMode = rebalanceRevokeMode)
+
override def toString: String =
s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)"
@@ -542,7 +578,8 @@ object ConsumerSettings {
pollTimeout = 50.millis,
commitRecovery = CommitRecovery.Default,
recordMetadata = _ => OffsetFetchResponse.NO_METADATA,
- maxPrefetchBatches = 2
+ maxPrefetchBatches = 2,
+ rebalanceRevokeMode = RebalanceRevokeMode.Eager
)
def apply[F[_], K, V](
diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
index 57eb13ca9..f7887a5cc 100644
--- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
+++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
@@ -7,14 +7,12 @@
package fs2.kafka
import java.util
-
import scala.annotation.nowarn
import scala.collection.immutable.SortedSet
import scala.concurrent.duration.FiniteDuration
import scala.util.matching.Regex
-
-import cats.{Foldable, Functor, Reducible}
-import cats.data.{NonEmptySet, OptionT}
+import cats.{Applicative, Foldable, Functor, Reducible}
+import cats.data.{Chain, NonEmptySet, OptionT}
import cats.effect.*
import cats.effect.implicits.*
import cats.effect.std.*
@@ -28,7 +26,6 @@ import fs2.kafka.internal.converters.collection.*
import fs2.kafka.internal.syntax.*
import fs2.kafka.internal.KafkaConsumerActor.*
import fs2.kafka.internal.LogEntry.{RevokedPreviousFetch, StoredFetch}
-
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetAndTimestamp}
import org.apache.kafka.common.{Metric, MetricName, PartitionInfo, TopicPartition}
@@ -151,7 +148,8 @@ object KafkaConsumer {
def partitionStream(
streamId: StreamId,
partition: TopicPartition,
- assignmentRevoked: F[Unit]
+ assignmentRevoked: F[Unit],
+ signalCompletion: F[Unit]
): Stream[F, CommittableConsumerRecord[F, K, V]] = Stream.force {
for {
chunks <- chunkQueue
@@ -238,7 +236,9 @@ object KafkaConsumer {
.fromQueueNoneTerminated(chunks)
.flatMap(Stream.chunk)
.covary[F]
- .onFinalize(dequeueDone.complete(()).void)
+ //Previously all the revoke logic was done on the polling stream, not handling stream
+ //with new `signalCompletion` there is possibility for more graceful consumer revoke logic
+ .onFinalize(dequeueDone.complete(()) *> signalCompletion)
}
}
.flatten
@@ -246,15 +246,15 @@ object KafkaConsumer {
def enqueueAssignment(
streamId: StreamId,
- assigned: Map[TopicPartition, Deferred[F, Unit]],
+ assigned: Map[TopicPartition, AssignmentSignals[F]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
stopConsumingDeferred
.tryGet
.flatMap {
case None =>
- val assignment: PartitionsMap = assigned.map { case (partition, finisher) =>
- partition -> partitionStream(streamId, partition, finisher.get)
+ val assignment = assigned.map { case (partition, assignmentSignals) =>
+ partition -> partitionStream(streamId, partition, assignmentSignals.awaitStreamTerminationSignal, assignmentSignals.signalStreamFinished.void)
}
partitionsMapQueue.offer(Some(assignment))
case Some(()) =>
@@ -263,24 +263,23 @@ object KafkaConsumer {
def onRebalance(
streamId: StreamId,
- assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
+ assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): OnRebalance[F] =
OnRebalance(
onRevoked = revoked => {
for {
finishers <- assignmentRef.modify(_.partition(entry => !revoked.contains(entry._1)))
- _ <- finishers.toVector.traverse { case (_, finisher) => finisher.complete(()) }
- } yield ()
+ revokeFinishers <- Chain.fromIterableOnce(finishers)
+ .traverse {
+ case (_, assignmentSignals) =>
+ assignmentSignals.signalStreamToTerminate.as(assignmentSignals.awaitStreamFinishedSignal)
+ }
+ } yield revokeFinishers
},
onAssigned = assignedPartitions => {
for {
- assignment <- assignedPartitions
- .toVector
- .traverse { partition =>
- Deferred[F, Unit].map(partition -> _)
- }
- .map(_.toMap)
+ assignment <- buildAssignment(assignedPartitions)
_ <- assignmentRef.update(_ ++ assignment)
_ <- enqueueAssignment(
streamId = streamId,
@@ -291,11 +290,29 @@ object KafkaConsumer {
}
)
+ def buildAssignment(
+ assignedPartitions: SortedSet[TopicPartition]
+ ): F[Map[TopicPartition, AssignmentSignals[F]]] = {
+ assignedPartitions
+ .toVector
+ .traverse { partition =>
+ settings.rebalanceRevokeMode match {
+ case RebalanceRevokeMode.EagerMode =>
+ Deferred[F, Unit].map(streamFinisher => partition -> AssignmentSignals.eager(streamFinisher))
+ case RebalanceRevokeMode.GracefulMode =>
+ (Deferred[F, Unit], Deferred[F, Unit]).mapN { (streamFinisher, revokeFinisher) =>
+ partition -> AssignmentSignals.graceful(streamFinisher, revokeFinisher)
+ }
+ }
+ }
+ .map(_.toMap)
+ }
+
def requestAssignment(
streamId: StreamId,
- assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
+ assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
- ): F[Map[TopicPartition, Deferred[F, Unit]]] = {
+ ): F[Map[TopicPartition, AssignmentSignals[F]]] = {
val assignment = this.assignment(
Some(
onRebalance(
@@ -312,18 +329,13 @@ object KafkaConsumer {
F.pure(Map.empty)
case Right(assigned) =>
- assigned
- .toVector
- .traverse { partition =>
- Deferred[F, Unit].map(partition -> _)
- }
- .map(_.toMap)
+ buildAssignment(assigned)
}
}
def initialEnqueue(
streamId: StreamId,
- assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
+ assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
for {
@@ -343,7 +355,7 @@ object KafkaConsumer {
partitionsMapQueue <- Stream.eval(Queue.unbounded[F, Option[PartitionsMap]])
streamId <- Stream.eval(streamIdRef.modify(n => (n + 1, n)))
assignmentRef <- Stream
- .eval(Ref[F].of(Map.empty[TopicPartition, Deferred[F, Unit]]))
+ .eval(Ref[F].of(Map.empty[TopicPartition, AssignmentSignals[F]]))
_ <- Stream.eval(
initialEnqueue(
streamId,
@@ -432,7 +444,9 @@ object KafkaConsumer {
assignmentRef.updateAndGet(_ ++ assigned).flatMap(updateQueue.offer),
onRevoked = revoked =>
initialAssignmentDone >>
- assignmentRef.updateAndGet(_ -- revoked).flatMap(updateQueue.offer)
+ assignmentRef.updateAndGet(_ -- revoked)
+ .flatMap(updateQueue.offer)
+ .as(Chain.empty)
)
Stream
@@ -825,6 +839,40 @@ object KafkaConsumer {
}
+ /**
+ * Utility class to provide clarity for internals.
+ * Goal is to make [[RebalanceRevokeMode]] transparent to the rest of implementation internals.
+ * @tparam F effect used
+ */
+ private sealed abstract class AssignmentSignals[F[_]] {
+ def signalStreamToTerminate: F[Boolean]
+ def awaitStreamTerminationSignal: F[Unit]
+ def signalStreamFinished: F[Boolean]
+ def awaitStreamFinishedSignal: F[Unit]
+ }
+
+ private object AssignmentSignals {
+
+ def eager[F[_]: Applicative](streamFinisher: Deferred[F, Unit]): AssignmentSignals[F] = EagerSignals(streamFinisher)
+ def graceful[F[_]](streamFinisher: Deferred[F, Unit], revokeFinisher: Deferred[F, Unit]): AssignmentSignals[F] =
+ GracefulSignals[F](streamFinisher, revokeFinisher)
+
+ final case class EagerSignals[F[_]: Applicative](streamFinisher: Deferred[F, Unit]) extends AssignmentSignals[F] {
+ override def signalStreamToTerminate: F[Boolean] = streamFinisher.complete(())
+ override def awaitStreamTerminationSignal: F[Unit] = streamFinisher.get
+ override def signalStreamFinished: F[Boolean] = true.pure[F]
+ override def awaitStreamFinishedSignal: F[Unit] = ().pure[F]
+ }
+
+ final case class GracefulSignals[F[_]](streamFinisher: Deferred[F, Unit], revokeFinisher: Deferred[F, Unit]) extends AssignmentSignals[F] {
+ override def signalStreamToTerminate: F[Boolean] = streamFinisher.complete(())
+ override def awaitStreamTerminationSignal: F[Unit] = streamFinisher.get
+ override def signalStreamFinished: F[Boolean] = revokeFinisher.complete(())
+ override def awaitStreamFinishedSignal: F[Unit] = revokeFinisher.get
+ }
+
+ }
+
/*
* Prevents the default `MkConsumer` instance from being implicitly available
* to code defined in this object, ensuring factory methods require an instance
diff --git a/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala b/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala
new file mode 100644
index 000000000..3edca1a5f
--- /dev/null
+++ b/modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala
@@ -0,0 +1,42 @@
+package fs2.kafka
+
+/**
+ * The available options for [[ConsumerSettings#rebalanceRevokeMode]].
+ *
+ * Available options include:
+ * - [[RebalanceRevokeMode.Eager]] old behaviour, to release assigned partition as soon as possible,
+ * - [[RebalanceRevokeMode.Graceful]] modified behavior, waiting for configured amount of time:
+ * {{{org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG}}} It is guaranteed by kafka protocol
+ * that after that timeout old consumer will be marked as dead.
+ *
+ * Default mode is [[RebalanceRevokeMode.Eager]] which is exactly the same as old behavior and can be preferred
+ * if rebalance need to happen as quickly as possible and having multiple consumers working on a partition for a moment
+ * is not a problem.
+ *
+ * On the other hand if you want stricter guarantees about processing and attempt to wait for existing streams to finish
+ * processing messages before releasing partition choose [[RebalanceRevokeMode.Graceful]].
+ * Because stream is signalled to be shutdown in-flight commits might be lost and some messages might be processed again
+ * after new assignment.
+ *
+ */
+sealed abstract class RebalanceRevokeMode
+
+object RebalanceRevokeMode {
+
+ private[kafka] case object EagerMode extends RebalanceRevokeMode
+
+ private[kafka] case object GracefulMode extends RebalanceRevokeMode
+
+ /**
+ * Old behavior releasing partition as soon as all streams have messages dispatched and signalled termination
+ */
+ val Eager: RebalanceRevokeMode = EagerMode
+
+ /**
+ * Waiting for configured amount of time:
+ * {{{org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG}}} or until all the partition streams finish processing (but not waiting
+ * for commits to conclude for that partition)
+ */
+ val Graceful: RebalanceRevokeMode = GracefulMode
+
+}
diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
index 6a1f4fe02..80b1b5ba7 100644
--- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
+++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
@@ -224,13 +224,17 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
}
.flatMap { res =>
val onRevoked =
- res.onRebalances.foldLeft(F.unit)(_ >> _.onRevoked(revoked))
+ res.onRebalances.foldLeftM(Chain.empty[F[Unit]]) { (revocationsAcc, revocationsNext) =>
+ revocationsNext.onRevoked(revoked).map(revocationsAcc ++ _)
+ }
res.logRevoked >>
res.completeWithRecords >>
res.completeWithoutRecords >>
res.removeRevokedRecords >>
- onRevoked
+ onRevoked //first we trigger all the streams to finalize
+ .flatMap(_.sequence_) //second we await streams termination (Eager mode returns immediately)
+ .timeout(settings.sessionTimeout) //just to be extra-safe timeout this revoke
}
}
@@ -630,7 +634,7 @@ private[kafka] object KafkaConsumerActor {
final case class OnRebalance[F[_]](
onAssigned: SortedSet[TopicPartition] => F[Unit],
- onRevoked: SortedSet[TopicPartition] => F[Unit]
+ onRevoked: SortedSet[TopicPartition] => F[Chain[F[Unit]]],
) {
override def toString: String =
diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
index 74b9355c7..1fc2a800e 100644
--- a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
+++ b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
@@ -10,10 +10,9 @@ import scala.collection.immutable.SortedSet
import scala.concurrent.duration.*
import cats.data.NonEmptySet
-import cats.effect.{Fiber, IO}
+import cats.effect.{Clock, Fiber, IO, Ref}
import cats.effect.std.Queue
import cats.effect.unsafe.implicits.global
-import cats.effect.Ref
import cats.syntax.all.*
import fs2.concurrent.SignallingRef
import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow
@@ -1212,6 +1211,65 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}
+ describe("KafkaConsumer#stream") {
+ it("should wait for previous generation of streams to start consuming messages with RebalanceRevokeMode#Graceful") {
+ withTopic { topic =>
+ createCustomTopic(topic, partitions = 2) //minimal amount of partitions for two consumers
+ def recordRange(from: Int, _until: Int) = (from until _until).map(n => s"key-$n" -> s"value-$n")
+
+ def produceRange(from: Int, until: Int): IO[Unit] = IO {
+ val produced = recordRange(from, until)
+ publishToKafka(topic, produced)
+ }
+
+ // tracking consumption for being unique by explicitly commiting after each message
+ val consumed = for {
+ ref <- Ref.of[IO, Vector[(String, String)]](Vector.empty)
+ _ <- produceRange(0, 10)
+ _ <-
+ KafkaConsumer
+ .stream(consumerSettings[IO].withRebalanceRevokeMode(RebalanceRevokeMode.Graceful))
+ .evalTap(_.subscribeTo(topic))
+ .flatMap(
+ _.stream
+ .evalMap { record =>
+ ref.update(_ :+ (record.record.key -> record.record.value)).as(record.offset)
+ }
+ .evalTap(_.commit)
+ )
+ .interruptAfter(3.seconds)
+ .compile
+ .drain
+ .race {
+ Clock[IO].sleep(1.second) *>
+ produceRange(10, 20) *>
+ KafkaConsumer
+ .stream(consumerSettings[IO].withRebalanceRevokeMode(RebalanceRevokeMode.Graceful))
+ .evalTap(_.subscribeTo(topic))
+ .flatMap( c =>
+ fs2.Stream.exec(produceRange(20, 30)) ++
+ c.stream
+ .evalMap { record =>
+ ref.update(_ :+ (record.record.key -> record.record.value)).as(record.offset)
+ }
+ .evalTap(_.commit)
+ )
+ .interruptAfter(3.seconds)
+ .compile
+ .drain
+ }
+ res <- ref.get
+ } yield res
+
+ val res = consumed.unsafeRunSync()
+
+ //expected behavior is that no duplicate consumption is performed
+ res.toSet should have size res.length.toLong
+ (res should contain).theSameElementsAs(recordRange(0, 10) ++ recordRange(10, 20) ++ recordRange(20, 30))
+ }
+ }
+ }
+
private def commitTest(
commit: (KafkaConsumer[IO, String, String], CommittableOffsetBatch[IO]) => IO[Unit]
): Assertion =