Skip to content

Commit

Permalink
feat: Implemented RebalanceRevokeMode
Browse files Browse the repository at this point in the history
This is potential solution for fd4s#1200. Allowing for opting-in for graceful revoke handling (waiting for all the streams to finish, which should imply all the commits as well)
  • Loading branch information
wookievx committed Aug 4, 2024
1 parent a73ad59 commit ae11e26
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 35 deletions.
43 changes: 43 additions & 0 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,49 @@ You may notice, that actual graceful shutdown implementation requires a decent a

Also note, that even if you implement a graceful shutdown your application may fall with an error. And in this case, a graceful shutdown will not be invoked. It means that your application should be ready to an _at least once_ semantic even when a graceful shutdown is implemented. Or, if you need an _exactly once_ semantic, consider using [transactions](transactions.md).

### Graceful partition revoke

In addition to graceful shutdown of hole consumer there is an option to configure your consumer to wait for the streams
to finish processing partition before "releasing" it. Behavior can be enabled via the following settings:
```scala mdoc:silent
object WithGracefulPartitionRevoke extends IOApp.Simple {

val run: IO[Unit] = {
def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

def run(consumer: KafkaConsumer[IO, String, String]): IO[Unit] = {
consumer.subscribeTo("topic") >> consumer
.stream
.evalMap { msg =>
processRecord(msg).as(msg.offset)
}
.through(commitBatchWithin(100, 15.seconds))
.compile
.drain
}

val consumerSettings = ConsumerSettings[IO, String, String] =
ConsumerSettings[IO, String, String]
.withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)
.withSessionTimeout(2.seconds)

KafkaConsumer
.resource(consumerSettings)
.use { consumer =>
run(consumer)
}
}

}
```

Please note that this setting does not guarantee that all the commits will be performed before partition is revoked and
that `session.timeout.ms` setting is set to lower value. Be aware that awaiting too long for partition processor
to finish will cause processing of the whole topic to be suspended.

Awaiting for commits to complete might be implemented in the future.

[commitrecovery-default]: @API_BASE_URL@/CommitRecovery$.html#Default:fs2.kafka.CommitRecovery
[committableconsumerrecord]: @API_BASE_URL@/CommittableConsumerRecord.html
[committableoffset]: @API_BASE_URL@/CommittableOffset.html
Expand Down
45 changes: 41 additions & 4 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ package fs2.kafka

import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext

import cats.effect.Resource
import cats.Show
import fs2.kafka.security.KafkaCredentialStore

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.requests.OffsetFetchResponse

import scala.util.Try

/**
* [[ConsumerSettings]] contain settings necessary to create a [[KafkaConsumer]]. At the very
* least, this includes key and value deserializers.<br><br>
Expand Down Expand Up @@ -151,6 +151,18 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V]


/**
* Returns value for property:
*
* {{{
* ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG
* }}}
*
* Returns a value as a [[FiniteDuration]] for convenience
*/
def sessionTimeout: FiniteDuration

/**
* Returns a new [[ConsumerSettings]] instance with the specified session timeout. This is
* equivalent to setting the following property using the [[withProperty]] function, except you
Expand Down Expand Up @@ -373,6 +385,17 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V]

/**
* One of two possible modes of operation for [[KafkaConsumer.partitionsMapStream]]. See [[RebalanceRevokeMode]]
* for detailed explanation of differences between them.
*/
def rebalanceRevokeMode: RebalanceRevokeMode

/**
* Creates a new [[ConsumerSettings]] with the specified [[rebalanceRevokeMode]].
*/
def withRebalanceRevokeMode(rebalanceRevokeMode: RebalanceRevokeMode): ConsumerSettings[F, K, V]

}

object ConsumerSettings {
Expand All @@ -388,7 +411,8 @@ object ConsumerSettings {
override val pollTimeout: FiniteDuration,
override val commitRecovery: CommitRecovery,
override val recordMetadata: ConsumerRecord[K, V] => String,
override val maxPrefetchBatches: Int
override val maxPrefetchBatches: Int,
override val rebalanceRevokeMode: RebalanceRevokeMode
) extends ConsumerSettings[F, K, V] {

override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] =
Expand Down Expand Up @@ -422,6 +446,13 @@ object ConsumerSettings {
override def withMaxPollInterval(maxPollInterval: FiniteDuration): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)

//need to use Try, to avoid separate implementation for scala 2.12
override def sessionTimeout: FiniteDuration =
properties.get(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG)
.flatMap(str => Try(str.toLong).toOption)
.map(_.millis)
.getOrElse(45000.millis)

override def withSessionTimeout(sessionTimeout: FiniteDuration): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toMillis.toString)

Expand Down Expand Up @@ -509,6 +540,11 @@ object ConsumerSettings {
): ConsumerSettings[F, K, V] =
withProperties(credentialsStore.properties)

override def withRebalanceRevokeMode(
rebalanceRevokeMode: RebalanceRevokeMode
): ConsumerSettings[F, K, V] =
copy(rebalanceRevokeMode = rebalanceRevokeMode)

override def toString: String =
s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)"

Expand Down Expand Up @@ -542,7 +578,8 @@ object ConsumerSettings {
pollTimeout = 50.millis,
commitRecovery = CommitRecovery.Default,
recordMetadata = _ => OffsetFetchResponse.NO_METADATA,
maxPrefetchBatches = 2
maxPrefetchBatches = 2,
rebalanceRevokeMode = RebalanceRevokeMode.Eager
)

def apply[F[_], K, V](
Expand Down
107 changes: 78 additions & 29 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
package fs2.kafka

import java.util

import scala.annotation.nowarn
import scala.collection.immutable.SortedSet
import scala.concurrent.duration.FiniteDuration
import scala.util.matching.Regex

import cats.{Foldable, Functor, Reducible}
import cats.{Applicative, Foldable, Functor, Reducible}
import cats.data.{NonEmptySet, OptionT}
import cats.effect.*
import cats.effect.implicits.*
Expand All @@ -28,7 +26,6 @@ import fs2.kafka.internal.converters.collection.*
import fs2.kafka.internal.syntax.*
import fs2.kafka.internal.KafkaConsumerActor.*
import fs2.kafka.internal.LogEntry.{RevokedPreviousFetch, StoredFetch}

import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetAndTimestamp}
import org.apache.kafka.common.{Metric, MetricName, PartitionInfo, TopicPartition}

Expand Down Expand Up @@ -151,7 +148,8 @@ object KafkaConsumer {
def partitionStream(
streamId: StreamId,
partition: TopicPartition,
assignmentRevoked: F[Unit]
assignmentRevoked: F[Unit],
signalCompletion: F[Unit]
): Stream[F, CommittableConsumerRecord[F, K, V]] = Stream.force {
for {
chunks <- chunkQueue
Expand Down Expand Up @@ -238,23 +236,25 @@ object KafkaConsumer {
.fromQueueNoneTerminated(chunks)
.flatMap(Stream.chunk)
.covary[F]
.onFinalize(dequeueDone.complete(()).void)
//Previously all the revoke logic was done on the polling stream, not handling stream
//with new `signalCompletion` there is possibility for more graceful consumer revoke logic
.onFinalize(dequeueDone.complete(()) *> signalCompletion)
}
}
.flatten
}

def enqueueAssignment(
streamId: StreamId,
assigned: Map[TopicPartition, Deferred[F, Unit]],
assigned: Map[TopicPartition, AssignmentSignals[F]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
stopConsumingDeferred
.tryGet
.flatMap {
case None =>
val assignment: PartitionsMap = assigned.map { case (partition, finisher) =>
partition -> partitionStream(streamId, partition, finisher.get)
val assignment = assigned.map { case (partition, assignmentSignals) =>
partition -> partitionStream(streamId, partition, assignmentSignals.awaitStreamTerminationSignal, assignmentSignals.signalStreamFinished.void)
}
partitionsMapQueue.offer(Some(assignment))
case Some(()) =>
Expand All @@ -263,24 +263,26 @@ object KafkaConsumer {

def onRebalance(
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): OnRebalance[F] =
OnRebalance(
onRevoked = revoked => {
for {
val finishSignals = for {
finishers <- assignmentRef.modify(_.partition(entry => !revoked.contains(entry._1)))
_ <- finishers.toVector.traverse { case (_, finisher) => finisher.complete(()) }
} yield ()
revokeFinishers <- finishers
.toVector
.traverse {
case (_, assignmentSignals) =>
assignmentSignals.signalStreamToTerminate.as(assignmentSignals.awaitStreamFinishedSignal)
}
} yield revokeFinishers

finishSignals.flatMap(revokes => revokes.sequence_)
},
onAssigned = assignedPartitions => {
for {
assignment <- assignedPartitions
.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
assignment <- buildAssignment(assignedPartitions)
_ <- assignmentRef.update(_ ++ assignment)
_ <- enqueueAssignment(
streamId = streamId,
Expand All @@ -291,11 +293,29 @@ object KafkaConsumer {
}
)

def buildAssignment(
assignedPartitions: SortedSet[TopicPartition]
): F[Map[TopicPartition, AssignmentSignals[F]]] = {
assignedPartitions
.toVector
.traverse { partition =>
settings.rebalanceRevokeMode match {
case RebalanceRevokeMode.EagerMode =>
Deferred[F, Unit].map(streamFinisher => partition -> AssignmentSignals.eager(streamFinisher))
case RebalanceRevokeMode.GracefulMode =>
(Deferred[F, Unit], Deferred[F, Unit]).mapN { (streamFinisher, revokeFinisher) =>
partition -> AssignmentSignals.graceful(streamFinisher, revokeFinisher)
}
}
}
.map(_.toMap)
}

def requestAssignment(
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): F[Map[TopicPartition, Deferred[F, Unit]]] = {
): F[Map[TopicPartition, AssignmentSignals[F]]] = {
val assignment = this.assignment(
Some(
onRebalance(
Expand All @@ -312,18 +332,13 @@ object KafkaConsumer {
F.pure(Map.empty)

case Right(assigned) =>
assigned
.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
buildAssignment(assigned)
}
}

def initialEnqueue(
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
assignmentRef: Ref[F, Map[TopicPartition, AssignmentSignals[F]]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
for {
Expand All @@ -343,7 +358,7 @@ object KafkaConsumer {
partitionsMapQueue <- Stream.eval(Queue.unbounded[F, Option[PartitionsMap]])
streamId <- Stream.eval(streamIdRef.modify(n => (n + 1, n)))
assignmentRef <- Stream
.eval(Ref[F].of(Map.empty[TopicPartition, Deferred[F, Unit]]))
.eval(Ref[F].of(Map.empty[TopicPartition, AssignmentSignals[F]]))
_ <- Stream.eval(
initialEnqueue(
streamId,
Expand Down Expand Up @@ -825,6 +840,40 @@ object KafkaConsumer {

}

/**
* Utility class to provide clarity for internals.
* Goal is to make [[RebalanceRevokeMode]] transparent to the rest of implementation internals.
* @tparam F effect used
*/
private sealed abstract class AssignmentSignals[F[_]] {
def signalStreamToTerminate: F[Boolean]
def awaitStreamTerminationSignal: F[Unit]
def signalStreamFinished: F[Boolean]
def awaitStreamFinishedSignal: F[Unit]
}

private object AssignmentSignals {

def eager[F[_]: Applicative](streamFinisher: Deferred[F, Unit]): AssignmentSignals[F] = EagerSignals(streamFinisher)
def graceful[F[_]](streamFinisher: Deferred[F, Unit], revokeFinisher: Deferred[F, Unit]): AssignmentSignals[F] =
GracefulSignals[F](streamFinisher, revokeFinisher)

final case class EagerSignals[F[_]: Applicative](streamFinisher: Deferred[F, Unit]) extends AssignmentSignals[F] {
override def signalStreamToTerminate: F[Boolean] = streamFinisher.complete(())
override def awaitStreamTerminationSignal: F[Unit] = streamFinisher.get
override def signalStreamFinished: F[Boolean] = true.pure[F]
override def awaitStreamFinishedSignal: F[Unit] = ().pure[F]
}

final case class GracefulSignals[F[_]](streamFinisher: Deferred[F, Unit], revokeFinisher: Deferred[F, Unit]) extends AssignmentSignals[F] {
override def signalStreamToTerminate: F[Boolean] = streamFinisher.complete(())
override def awaitStreamTerminationSignal: F[Unit] = streamFinisher.get
override def signalStreamFinished: F[Boolean] = revokeFinisher.complete(())
override def awaitStreamFinishedSignal: F[Unit] = revokeFinisher.get
}

}

/*
* Prevents the default `MkConsumer` instance from being implicitly available
* to code defined in this object, ensuring factory methods require an instance
Expand Down
42 changes: 42 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/RebalanceRevokeMode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package fs2.kafka

/**
* The available options for [[ConsumerSettings#rebalanceRevokeMode]].<br><br>
*
* Available options include:<br>
* - [[RebalanceRevokeMode.Eager]] old behaviour, to release assigned partition as soon as possible,<br>
* - [[RebalanceRevokeMode.Graceful]] modified behavior, waiting for configured amount of time:
* {{{org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG}}} It is guaranteed by kafka protocol
* that after that timeout old consumer will be marked as dead.
*
* Default mode is [[RebalanceRevokeMode.Eager]] which is exactly the same as old behavior and can be preferred
* if rebalance need to happen as quickly as possible and having multiple consumers working on a partition for a moment
* is not a problem.
*
* On the other hand if you want stricter guarantees about processing and attempt to wait for existing streams to finish
* processing messages before releasing partition choose [[RebalanceRevokeMode.Graceful]].
* Because stream is signalled to be shutdown in-flight commits might be lost and some messages might be processed again
* after new assignment.
*
*/
sealed abstract class RebalanceRevokeMode

object RebalanceRevokeMode {

private[kafka] case object EagerMode extends RebalanceRevokeMode

private[kafka] case object GracefulMode extends RebalanceRevokeMode

/**
* Old behavior releasing partition as soon as all streams have messages dispatched and signalled termination
*/
val Eager: RebalanceRevokeMode = EagerMode

/**
* Waiting for configured amount of time:<br>
* [[ConsumerSettings#withMaxPollInterval]] * 5 or until all the partition streams finish processing (but not waiting
* for commits to conclude for that partition)
*/
val Graceful: RebalanceRevokeMode = GracefulMode

}
Loading

0 comments on commit ae11e26

Please sign in to comment.