Skip to content

Commit

Permalink
fix: Fixing implementation of revoke listener to not wait for streams…
Browse files Browse the repository at this point in the history
… termination while signaling streams to finish
  • Loading branch information
wookievx committed Nov 13, 2024
1 parent 6de1682 commit 9e28b44
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
13 changes: 6 additions & 7 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.collection.immutable.SortedSet
import scala.concurrent.duration.FiniteDuration
import scala.util.matching.Regex
import cats.{Applicative, Foldable, Functor, Reducible}
import cats.data.{NonEmptySet, OptionT}
import cats.data.{Chain, NonEmptySet, OptionT}
import cats.effect.*
import cats.effect.implicits.*
import cats.effect.std.*
Expand Down Expand Up @@ -268,17 +268,14 @@ object KafkaConsumer {
): OnRebalance[F] =
OnRebalance(
onRevoked = revoked => {
val finishSignals = for {
for {
finishers <- assignmentRef.modify(_.partition(entry => !revoked.contains(entry._1)))
revokeFinishers <- finishers
.toVector
revokeFinishers <- Chain.fromIterableOnce(finishers)
.traverse {
case (_, assignmentSignals) =>
assignmentSignals.signalStreamToTerminate.as(assignmentSignals.awaitStreamFinishedSignal)
}
} yield revokeFinishers

finishSignals.flatMap(revokes => revokes.sequence_)
},
onAssigned = assignedPartitions => {
for {
Expand Down Expand Up @@ -447,7 +444,9 @@ object KafkaConsumer {
assignmentRef.updateAndGet(_ ++ assigned).flatMap(updateQueue.offer),
onRevoked = revoked =>
initialAssignmentDone >>
assignmentRef.updateAndGet(_ -- revoked).flatMap(updateQueue.offer)
assignmentRef.updateAndGet(_ -- revoked)
.flatMap(updateQueue.offer)
.as(Chain.empty)
)

Stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,17 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
}
.flatMap { res =>
val onRevoked =
res.onRebalances.foldLeft(F.unit)(_ >> _.onRevoked(revoked))
res.onRebalances.foldLeftM(Chain.empty[F[Unit]]) { (revocationsAcc, revocationsNext) =>
revocationsNext.onRevoked(revoked).map(revocationsAcc ++ _)
}

res.logRevoked >>
res.completeWithRecords >>
res.completeWithoutRecords >>
res.removeRevokedRecords >>
onRevoked.timeout(settings.sessionTimeout) //just to be extra-safe timeout this revoke
onRevoked //first we trigger all the streams to finalize
.flatMap(_.sequence_) //second we await streams termination (Eager mode returns immediately)
.timeout(settings.sessionTimeout) //just to be extra-safe timeout this revoke
}
}

Expand Down Expand Up @@ -630,7 +634,7 @@ private[kafka] object KafkaConsumerActor {

final case class OnRebalance[F[_]](
onAssigned: SortedSet[TopicPartition] => F[Unit],
onRevoked: SortedSet[TopicPartition] => F[Unit],
onRevoked: SortedSet[TopicPartition] => F[Chain[F[Unit]]],
) {

override def toString: String =
Expand Down

0 comments on commit 9e28b44

Please sign in to comment.