From 5bffff3ad245252d6117dab28b90245812749cfe Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Thu, 26 Oct 2023 09:27:20 +0200 Subject: [PATCH 01/14] Always end streams in rebalance listener, support lost partitions Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used. Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data. A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'. The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830). Also: fix test `restartStreamsOnRebalancing mode closes all partition streams` so that it detects rebalances properly on fast computers. --- .../zio/kafka/consumer/ConsumerSpec.scala | 28 +- .../zio/kafka/consumer/internal/Runloop.scala | 284 +++++++++--------- 2 files changed, 165 insertions(+), 147 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index cada18e17..999ae432b 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -740,6 +740,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { test("restartStreamsOnRebalancing mode closes all partition streams") { val nrPartitions = 5 val nrMessages = 100 + val partitionIds = (0 until nrPartitions).toList for { // Produce messages on several partitions @@ -755,9 +756,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } // Consume messages - messagesReceived <- - ZIO.foreach((0 until nrPartitions).toList)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap) - drainCount <- Ref.make(0) + // A map with partition as key, and a messages-received-counter Ref as value: + messagesReceived <- ZIO.foreach(partitionIds)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap) + drainCount <- Ref.make(0) subscription = Subscription.topics(topic) fib <- ZIO .logAnnotate("consumer", "1") { @@ -771,9 +772,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *> partitionStream.mapChunksZIO { records => - OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition) - .update(_ + records.size) - .as(records) + for { + _ <- OffsetBatch(records.map(_.offset)).commit + _ <- messagesReceived(tp.partition).update(_ + records.size) + } yield records } } .runDrain @@ -803,6 +805,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- ZIO .foreach(messagesReceived.values)(_.get) .map(_.sum) + .debug("Messages received by Fib1: ") .repeat(Schedule.recurUntil((n: Int) => n == nrMessages) && Schedule.fixed(100.millis)) // Starting a new consumer that will stop after receiving 20 messages, @@ -825,17 +828,20 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .fork // Waiting until fib1's partition streams got restarted because of the rebalancing - _ <- drainCount.get.repeat(Schedule.recurUntil((n: Int) => n == 1) && Schedule.fixed(100.millis)) + // Note: on fast computers we may not never see `drainCount == 1` but `2` immediately, therefore + // we need to check for `drainCount >= 1`. + _ <- drainCount.get.repeat(Schedule.recurUntil((_: Int) >= 1) && Schedule.fixed(100.millis)) _ <- ZIO.logDebug("Consumer 1 finished rebalancing") // All messages processed, the partition streams of fib are still running. // Saving the values and resetting the counters messagesReceived0 <- ZIO - .foreach((0 until nrPartitions).toList) { i => - messagesReceived(i).get.flatMap { v => - Ref.make(v).map(r => i -> r) - } <* messagesReceived(i).set(0) + .foreach(partitionIds) { i => + for { + v <- messagesReceived(i).getAndSet(0) + p <- Ref.make(v).map(i -> _) + } yield p } .map(_.toMap) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 586bacff7..2f458b5fd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -28,7 +28,7 @@ private[consumer] final class Runloop private ( maxPollInterval: Duration, commitTimeout: Duration, commandQueue: Queue[RunloopCommand], - lastRebalanceEvent: Ref.Synchronized[Option[Runloop.RebalanceEvent]], + lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, offsetRetrieval: OffsetRetrieval, @@ -71,45 +71,48 @@ private[consumer] final class Runloop private ( commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private val rebalanceListener: RebalanceListener = { - val emitDiagnostics = RebalanceListener( - (assigned, _) => diagnostics.emit(DiagnosticEvent.Rebalance.Assigned(assigned)), - (revoked, _) => diagnostics.emit(DiagnosticEvent.Rebalance.Revoked(revoked)), - (lost, _) => diagnostics.emit(DiagnosticEvent.Rebalance.Lost(lost)) - ) - - def restartStreamsRebalancingListener = RebalanceListener( - onAssigned = (assigned, _) => - ZIO.logDebug("Rebalancing completed") *> - lastRebalanceEvent.updateZIO { - case None => - ZIO.some(Runloop.RebalanceEvent.Assigned(assigned)) - case Some(Runloop.RebalanceEvent.Revoked(revokeResult)) => - ZIO.some(Runloop.RebalanceEvent.RevokedAndAssigned(revokeResult, assigned)) - case Some(_) => - ZIO.fail(new IllegalStateException(s"Multiple onAssigned calls on rebalance listener")) - }, - onRevoked = (_, _) => + val recordRebalanceRebalancingListener = RebalanceListener( + onAssigned = (assignedTps, _) => for { - _ <- ZIO.logDebug("Rebalancing started") - state <- currentStateRef.get - // End all streams - result <- endRevokedPartitions(state.pendingRequests, state.assignedStreams, isRevoked = _ => true) - _ <- lastRebalanceEvent.updateZIO { - case None => - ZIO.some(Runloop.RebalanceEvent.Revoked(result)) - case _ => - ZIO.fail( - new IllegalStateException(s"onRevoked called on rebalance listener with pending assigned event") - ) - } + _ <- ZIO.logDebug(s"${assignedTps.size} partitions are assigned") + _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Assigned(assignedTps)) + rebalanceEvent <- lastRebalanceEvent.get + state <- currentStateRef.get + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams + else Chunk.empty + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd)) + _ <- ZIO.logTrace("onAssigned done") + } yield (), + onRevoked = (revokedTps, _) => + for { + _ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked") + _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Revoked(revokedTps)) + rebalanceEvent <- lastRebalanceEvent.get + state <- currentStateRef.get + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams + else state.assignedStreams.filter(control => revokedTps.contains(control.tp)) + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd)) + _ <- ZIO.logTrace("onRevoked done") + } yield (), + onLost = (lostTps, _) => + for { + _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") + _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Lost(lostTps)) + rebalanceEvent <- lastRebalanceEvent.get + state <- currentStateRef.get + (lostStreams, remainingStreams) = state.assignedStreams.partition(control => lostTps.contains(control.tp)) + _ <- ZIO.foreachDiscard(lostStreams)(_.lost) + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) remainingStreams + else Chunk.empty + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps, endedStreams = streamsToEnd)) + _ <- ZIO.logTrace(s"onLost done") } yield () ) - if (restartStreamsOnRebalancing) { - emitDiagnostics ++ restartStreamsRebalancingListener ++ userRebalanceListener - } else { - emitDiagnostics ++ userRebalanceListener - } + recordRebalanceRebalancingListener ++ userRebalanceListener } /** This is the implementation behind the user facing api `Offset.commit`. */ @@ -284,12 +287,14 @@ private[consumer] final class Runloop private ( private def handlePoll(state: State): Task[State] = for { - _ <- - ZIO.logDebug( - s"Starting poll with ${state.pendingRequests.size} pending requests and ${state.pendingCommits.size} pending commits" - ) - _ <- currentStateRef.set(state) partitionsToFetch <- fetchStrategy.selectPartitionsToFetch(state.assignedStreams) + _ <- ZIO.logDebug( + s"Starting poll with ${state.pendingRequests.size} pending requests and" + + s" ${state.pendingCommits.size} pending commits," + + s" resuming ${partitionsToFetch} partitions" + ) + _ <- currentStateRef.set(state) + _ <- lastRebalanceEvent.set(RebalanceEvent.None) pollResult <- consumer.runloopAccess { c => ZIO.suspend { @@ -302,101 +307,86 @@ private[consumer] final class Runloop private ( if (records eq null) ConsumerRecords.empty[Array[Byte], Array[Byte]]() else records } - val currentAssigned = c.assignment().asScala.toSet - val newlyAssigned = currentAssigned -- prevAssigned - - for { - ignoreRecordsForTps <- doSeekForNewPartitions(c, newlyAssigned) - - rebalanceEvent <- lastRebalanceEvent.getAndSet(None) - - revokeResult <- rebalanceEvent match { - case Some(Runloop.RebalanceEvent.Revoked(result)) => - // If we get here, `restartStreamsOnRebalancing == true` - // Use revoke result from endRevokedPartitions that was called previously in the rebalance listener - ZIO.succeed(result) - case Some(Runloop.RebalanceEvent.RevokedAndAssigned(result, _)) => - // If we get here, `restartStreamsOnRebalancing == true` - // Use revoke result from endRevokedPartitions that was called previously in the rebalance listener - ZIO.succeed(result) - case Some(Runloop.RebalanceEvent.Assigned(_)) => - // If we get here, `restartStreamsOnRebalancing == true` - // endRevokedPartitions was not called yet in the rebalance listener, - // and all partitions should be revoked - endRevokedPartitions( - state.pendingRequests, - state.assignedStreams, - isRevoked = _ => true - ) - case None => - // End streams for partitions that are no longer assigned - endRevokedPartitions( - state.pendingRequests, - state.assignedStreams, - isRevoked = (tp: TopicPartition) => !currentAssigned.contains(tp) - ) - } - - startingTps = rebalanceEvent match { - case Some(_) => - // If we get here, `restartStreamsOnRebalancing == true`, - // some partitions were revoked and/or assigned and - // all already assigned streams were ended. - // Therefore, all currently assigned tps are starting, - // either because they are restarting, or because they - // are new. - currentAssigned - case None => - newlyAssigned - } - - _ <- diagnostics.emit { - val providedTps = polledRecords.partitions().asScala.toSet - val requestedPartitions = state.pendingRequests.map(_.tp).toSet - - DiagnosticEvent.Poll( - tpRequested = requestedPartitions, - tpWithData = providedTps, - tpWithoutData = requestedPartitions -- providedTps - ) - } + diagnostics.emit { + val providedTps = polledRecords.partitions().asScala.toSet + val requestedPartitions = state.pendingRequests.map(_.tp).toSet + + DiagnosticEvent.Poll( + tpRequested = requestedPartitions, + tpWithData = providedTps, + tpWithoutData = requestedPartitions -- providedTps + ) + } *> + lastRebalanceEvent.get.flatMap { + case RebalanceEvent(false, _, _, _, _) => + // The fast track, rebalance listener was not invoked: + // no assignment changes, only new records. + ZIO.succeed( + PollResult( + records = polledRecords, + ignoreRecordsForTps = Set.empty, + pendingRequests = state.pendingRequests, + assignedStreams = state.assignedStreams + ) + ) - } yield Runloop.PollResult( - startingTps = startingTps, - pendingRequests = revokeResult.pendingRequests, - assignedStreams = revokeResult.assignedStreams, - records = polledRecords, - ignoreRecordsForTps = ignoreRecordsForTps - ) + case RebalanceEvent(true, assignedTps, revokedTps, lostTps, endedStreams) => + // The slow track, the rebalance listener was invoked: + // some partitions were assigned, revoked or lost, + // some streams have ended. + + val currentAssigned = c.assignment().asScala.toSet + val endedTps = endedStreams.map(_.tp).toSet + for { + ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) + + // The topic partitions that need a new stream are: + // 1. Those that are freshly assigned + // 2. Those that are still assigned but were ended in the rebalance listener because + // of `restartStreamsOnRebalancing` being true + startingTps = assignedTps ++ (currentAssigned intersect endedTps) + + startingStreams <- + ZIO.foreach(Chunk.fromIterable(startingTps))(newPartitionStream).tap { newStreams => + ZIO.logDebug(s"Offering partition assignment $startingTps") *> + partitionsHub.publish( + Take.chunk(newStreams.map(_.tpStream)) + ) + } + + updatedAssignedStreams = + state.assignedStreams.filter(s => !endedTps.contains(s.tp)) ++ startingStreams + + // Remove pending requests for all streams that ended: + // 1. streams that were ended because the partition was lost + // 2. streams that were ended because the partition was revoked + // 3. streams that were ended because of `restartStreamsOnRebalancing` being true + updatedPendingRequests = + state.pendingRequests.filter { pendingRequest => + val tp = pendingRequest.tp + !(lostTps.contains(tp) || revokedTps.contains(tp) || endedStreams.exists(_.tp == tp)) + } + } yield Runloop.PollResult( + records = polledRecords, + ignoreRecordsForTps = ignoreRecordsForTps, + pendingRequests = updatedPendingRequests, + assignedStreams = updatedAssignedStreams + ) + } } } - startingStreams <- - if (pollResult.startingTps.isEmpty) { - ZIO.succeed(Chunk.empty[PartitionStreamControl]) - } else { - ZIO - .foreach(Chunk.fromIterable(pollResult.startingTps))(newPartitionStream) - .tap { newStreams => - ZIO.logDebug(s"Offering partition assignment ${pollResult.startingTps}") *> - partitionsHub.publish(Take.chunk(Chunk.fromIterable(newStreams.map(_.tpStream)))) - } - } - runningStreams <- ZIO.filter(pollResult.assignedStreams)(_.isRunning) - updatedStreams = runningStreams ++ startingStreams fulfillResult <- offerRecordsToStreams( - updatedStreams, + pollResult.assignedStreams, pollResult.pendingRequests, pollResult.ignoreRecordsForTps, pollResult.records ) updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending) - // Using `runningStreams` instead of `updatedStreams` because starting streams cannot exceed - // their poll interval yet: - _ <- checkStreamPollInterval(runningStreams) + _ <- checkStreamPollInterval(pollResult.assignedStreams) } yield state.copy( pendingRequests = fulfillResult.pendingRequests, pendingCommits = updatedPendingCommits, - assignedStreams = updatedStreams + assignedStreams = pollResult.assignedStreams ) /** @@ -580,11 +570,10 @@ private[consumer] object Runloop { type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]] private final case class PollResult( - startingTps: Set[TopicPartition], - pendingRequests: Chunk[RunloopCommand.Request], - assignedStreams: Chunk[PartitionStreamControl], records: ConsumerRecords[Array[Byte], Array[Byte]], - ignoreRecordsForTps: Set[TopicPartition] + ignoreRecordsForTps: Set[TopicPartition], + pendingRequests: Chunk[RunloopCommand.Request], + assignedStreams: Chunk[PartitionStreamControl] ) private final case class RevokeResult( pendingRequests: Chunk[RunloopCommand.Request], @@ -594,14 +583,37 @@ private[consumer] object Runloop { pendingRequests: Chunk[RunloopCommand.Request] ) - private sealed trait RebalanceEvent + private final case class RebalanceEvent( + wasInvoked: Boolean, + assignedTps: Set[TopicPartition], + revokedTps: Set[TopicPartition], + lostTps: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl] + ) { + def onAssigned(assigned: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + copy( + wasInvoked = true, + assignedTps = assignedTps ++ assigned, + endedStreams = this.endedStreams ++ endedStreams + ) + + def onRevoked(revoked: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + copy( + wasInvoked = true, + revokedTps = revokedTps ++ revoked, + endedStreams = this.endedStreams ++ endedStreams + ) + + def onLost(lost: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + copy( + wasInvoked = true, + lostTps = lostTps ++ lost, + endedStreams = this.endedStreams ++ endedStreams + ) + } + private object RebalanceEvent { - final case class Revoked(revokeResult: Runloop.RevokeResult) extends RebalanceEvent - final case class Assigned(newlyAssigned: Set[TopicPartition]) extends RebalanceEvent - final case class RevokedAndAssigned( - revokeResult: Runloop.RevokeResult, - newlyAssigned: Set[TopicPartition] - ) extends RebalanceEvent + val None: RebalanceEvent = RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) } def make( @@ -620,7 +632,7 @@ private[consumer] object Runloop { for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[Option[Runloop.RebalanceEvent]](None) + lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) runtime <- ZIO.runtime[Any] From 549ae383b43e62a1adcb0500b758e8bae8038b31 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Fri, 27 Oct 2023 17:09:32 +0200 Subject: [PATCH 02/14] Fix test In test `restartStreamsOnRebalancing mode closes all partition streams` consumer 1 is expected to receive at least 1 message. However, consumer 2 might grab all them through pre-fetching. Fix this by disabling pre-fetching for consumer 2. --- .../scala/zio/kafka/consumer/ConsumerSpec.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 999ae432b..3ec103c22 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -817,12 +817,16 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(20) .runDrain .provideSomeLayer[Kafka]( - consumer( - client2, - Some(group), - clientInstanceId = Some("consumer2"), - properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10") - ) + // Reduce `max.poll.records` and disable pre-fetch so that we are sure that consumer 2 does + // not pre-fetch more than it will process. + ZLayer { + consumerSettings( + client2, + Some(group), + clientInstanceId = Some("consumer2"), + `max.poll.records` = 10 + ).map(_.withFetchStrategy(_ => ZIO.succeed(Set.empty))) + } >>> minimalConsumer() ) } .fork From b42dbb63825b61a8b80eeb5653aa7e5175d0a724 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 28 Oct 2023 10:05:00 +0200 Subject: [PATCH 03/14] Make clearing rebalance event more obvious --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 2f458b5fd..f80dc41b7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -294,7 +294,6 @@ private[consumer] final class Runloop private ( s" resuming ${partitionsToFetch} partitions" ) _ <- currentStateRef.set(state) - _ <- lastRebalanceEvent.set(RebalanceEvent.None) pollResult <- consumer.runloopAccess { c => ZIO.suspend { @@ -317,7 +316,7 @@ private[consumer] final class Runloop private ( tpWithoutData = requestedPartitions -- providedTps ) } *> - lastRebalanceEvent.get.flatMap { + lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { case RebalanceEvent(false, _, _, _, _) => // The fast track, rebalance listener was not invoked: // no assignment changes, only new records. From 09b3cdc8f9b11ede8b0e41e6028b917d58faf613 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 28 Oct 2023 11:48:13 +0200 Subject: [PATCH 04/14] Fix test --- .../zio/kafka/consumer/ConsumerSpec.scala | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 3ec103c22..c62db9109 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -759,6 +759,14 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { // A map with partition as key, and a messages-received-counter Ref as value: messagesReceived <- ZIO.foreach(partitionIds)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap) drainCount <- Ref.make(0) + // Reduce `max.poll.records` and disable pre-fetch. + consumer1Settings <- consumerSettings( + client1, + Some(group), + clientInstanceId = Some("consumer1"), + restartStreamOnRebalancing = true, + `max.poll.records` = 10 + ).map(_.withoutPartitionPreFetching) subscription = Subscription.topics(topic) fib <- ZIO .logAnnotate("consumer", "1") { @@ -790,13 +798,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { ) .runDrain .provideSomeLayer[Kafka]( - consumer( - client1, - Some(group), - clientInstanceId = Some("consumer1"), - restartStreamOnRebalancing = true, - properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10") - ) + ZLayer.succeed(consumer1Settings) >>> minimalConsumer() ) } .fork @@ -808,8 +810,16 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .debug("Messages received by Fib1: ") .repeat(Schedule.recurUntil((n: Int) => n == nrMessages) && Schedule.fixed(100.millis)) - // Starting a new consumer that will stop after receiving 20 messages, - // causing two rebalancing events for fib1 consumers on start and stop + // Starting a new consumer that will stop after receiving 20 messages, causing two rebalancing events for fib1 + // consumers on start and stop. + // Reduce `max.poll.records` and disable pre-fetch so that we are sure that consumer 2 does not pre-fetch more + // than it will process. + consumer2Settings <- consumerSettings( + client2, + Some(group), + clientInstanceId = Some("consumer2"), + `max.poll.records` = 10 + ).map(_.withoutPartitionPreFetching) fib2 <- ZIO .logAnnotate("consumer", "2") { Consumer @@ -817,23 +827,14 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(20) .runDrain .provideSomeLayer[Kafka]( - // Reduce `max.poll.records` and disable pre-fetch so that we are sure that consumer 2 does - // not pre-fetch more than it will process. - ZLayer { - consumerSettings( - client2, - Some(group), - clientInstanceId = Some("consumer2"), - `max.poll.records` = 10 - ).map(_.withFetchStrategy(_ => ZIO.succeed(Set.empty))) - } >>> minimalConsumer() + ZLayer.succeed(consumer2Settings) >>> minimalConsumer() ) } .fork // Waiting until fib1's partition streams got restarted because of the rebalancing - // Note: on fast computers we may not never see `drainCount == 1` but `2` immediately, therefore - // we need to check for `drainCount >= 1`. + // Note: on fast computers we may not never see `drainCount == 1` but `2` immediately, therefore we need to + // check for `drainCount >= 1`. _ <- drainCount.get.repeat(Schedule.recurUntil((_: Int) >= 1) && Schedule.fixed(100.millis)) _ <- ZIO.logDebug("Consumer 1 finished rebalancing") @@ -849,8 +850,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } .map(_.toMap) - // Publishing another N messages - now they will be distributed among the two consumers until - // fib2 stops after 20 messages + // Publishing another N messages - now they will be distributed among the two consumers until fib2 stops after + // 20 messages _ <- ZIO.foreachDiscard((nrMessages + 1) to (2 * nrMessages)) { i => produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) } From 5149036a3bd691876d01bc83a9d6b42c77797c39 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 30 Oct 2023 08:49:27 +0100 Subject: [PATCH 05/14] Do not consider onLost as a rebalance for `restartStreamsOnRebalancing` --- .../zio/kafka/consumer/internal/Runloop.scala | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index f80dc41b7..eeba6f566 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -102,12 +102,9 @@ private[consumer] final class Runloop private ( _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Lost(lostTps)) rebalanceEvent <- lastRebalanceEvent.get state <- currentStateRef.get - (lostStreams, remainingStreams) = state.assignedStreams.partition(control => lostTps.contains(control.tp)) + lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp)) _ <- ZIO.foreachDiscard(lostStreams)(_.lost) - streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) remainingStreams - else Chunk.empty - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) - _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps, endedStreams = streamsToEnd)) + _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps)) _ <- ZIO.logTrace(s"onLost done") } yield () ) @@ -603,12 +600,8 @@ private[consumer] object Runloop { endedStreams = this.endedStreams ++ endedStreams ) - def onLost(lost: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = - copy( - wasInvoked = true, - lostTps = lostTps ++ lost, - endedStreams = this.endedStreams ++ endedStreams - ) + def onLost(lost: Set[TopicPartition]): RebalanceEvent = + copy(wasInvoked = true, lostTps = lostTps ++ lost) } private object RebalanceEvent { From 4bc9b23ab717bd49c50c67fd3ed63b563b077d5d Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 30 Oct 2023 08:53:34 +0100 Subject: [PATCH 06/14] Use no stack trace for lost exception --- .../kafka/consumer/internal/PartitionStreamControl.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 85f58b044..0e0b3882e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -72,8 +72,10 @@ final class PartitionStreamControl private ( queueInfoRef.get.map(_.deadlineExceeded(now)) /** To be invoked when the partition was lost. */ - private[internal] def lost: UIO[Boolean] = - interruptionPromise.fail(new RuntimeException(s"Partition ${tp.toString} was lost")) + private[internal] def lost: UIO[Boolean] = { + val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace + interruptionPromise.fail(lostException) + } /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Boolean] = { From 5e0cce69b1bea52e3be844f0b8fd72b24dfe4a4e Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 30 Oct 2023 18:01:12 +0100 Subject: [PATCH 07/14] Small fixes --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index c62db9109..47055debd 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -807,7 +807,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- ZIO .foreach(messagesReceived.values)(_.get) .map(_.sum) - .debug("Messages received by Fib1: ") .repeat(Schedule.recurUntil((n: Int) => n == nrMessages) && Schedule.fixed(100.millis)) // Starting a new consumer that will stop after receiving 20 messages, causing two rebalancing events for fib1 @@ -833,8 +832,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .fork // Waiting until fib1's partition streams got restarted because of the rebalancing - // Note: on fast computers we may not never see `drainCount == 1` but `2` immediately, therefore we need to - // check for `drainCount >= 1`. + // Note: on fast computers we may never see `drainCount == 1` but `2` immediately, therefore we need to check + // for `drainCount >= 1`. _ <- drainCount.get.repeat(Schedule.recurUntil((_: Int) >= 1) && Schedule.fixed(100.millis)) _ <- ZIO.logDebug("Consumer 1 finished rebalancing") From 9c4701d3dbfb2826f54bc7e2bb1135b987cc372b Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 30 Oct 2023 18:06:56 +0100 Subject: [PATCH 08/14] Trying to fix the test on slow machines --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 47055debd..c3144e732 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -854,6 +854,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- ZIO.foreachDiscard((nrMessages + 1) to (2 * nrMessages)) { i => produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) } + // Give the consumers some time + _ <- ZIO.sleep(500.millis) _ <- fib2.join _ <- ZIO.logDebug("Consumer 2 done") _ <- fib.join From 054052f8a3f7678901e6ba21b4a90f285485b0b0 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 30 Oct 2023 20:23:25 +0100 Subject: [PATCH 09/14] Trying to fix the test on slow machines --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index c3144e732..b985b6a1b 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -854,10 +854,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- ZIO.foreachDiscard((nrMessages + 1) to (2 * nrMessages)) { i => produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) } - // Give the consumers some time - _ <- ZIO.sleep(500.millis) _ <- fib2.join _ <- ZIO.logDebug("Consumer 2 done") + // Give consumer 1 some time to consume the rest + _ <- ZIO.sleep(500.millis) _ <- fib.join _ <- ZIO.logDebug("Consumer 1 done") // fib2 terminates after 20 messages, fib terminates after fib2 because of the rebalancing (drainCount==2) From 6c56770a8d66990c2ef8d456aeea75bd5dfb6c77 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 30 Oct 2023 22:00:41 +0100 Subject: [PATCH 10/14] Trying to fix the test on slow machines --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index b985b6a1b..3c5a1d984 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -857,7 +857,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- fib2.join _ <- ZIO.logDebug("Consumer 2 done") // Give consumer 1 some time to consume the rest - _ <- ZIO.sleep(500.millis) + _ <- ZIO.sleep(1.second) _ <- fib.join _ <- ZIO.logDebug("Consumer 1 done") // fib2 terminates after 20 messages, fib terminates after fib2 because of the rebalancing (drainCount==2) From 618f4b2cdd75fb695e81d6f0fd2148a228f2e732 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 1 Nov 2023 16:07:56 +0100 Subject: [PATCH 11/14] Rewrote test `restartStreamsOnRebalancing mode closes all partition streams` Also: fmt --- build.sbt | 6 +- .../zio/kafka/consumer/ConsumerSpec.scala | 210 ++++++++---------- 2 files changed, 101 insertions(+), 115 deletions(-) diff --git a/build.sbt b/build.sbt index 97713a3ea..cc9d89d3d 100644 --- a/build.sbt +++ b/build.sbt @@ -3,9 +3,9 @@ import sbt.Def lazy val kafkaVersion = "3.6.0" lazy val embeddedKafkaVersion = "3.6.0" // Should be the same as kafkaVersion, except for the patch part -lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion -lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" -lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11" +lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion +lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" +lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11" enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 3c5a1d984..7284aff22 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -738,9 +738,28 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assertCompletes }, test("restartStreamsOnRebalancing mode closes all partition streams") { + // Test plan: + // - Throughout the test, continuously produce to all partitions of a topics. + // - Start consumer 1, + // - track which partitions are assigned after each rebalance. + // - track which streams stopped + // - Start consumer 2 but finish after just a few records. This results in 2 rebalances for consumer 1. + // - Verify that in the first rebalance, consumer 1 ends the streams for _all_ partitions, + // and starts them again. + // + // NOTE: we need to use the cooperative sticky assignor. The default assignor `ConsumerPartitionAssignor`, + // revokes all partitions and re-assigns them on every rebalance, so the behavior for + // `restartStreamOnRebalancing` is already the default. + val nrPartitions = 5 - val nrMessages = 100 - val partitionIds = (0 until nrPartitions).toList + val partitionIds = Chunk.fromIterable(0 until nrPartitions) + + def awaitRebalance[A](partitionAssignments: Ref[Chunk[A]], nr: Int): ZIO[Any, Nothing, Unit] = + partitionAssignments.get + .repeat( + Schedule.recurUntil((_: Chunk[A]).size >= nr) && Schedule.fixed(100.millis) + ) + .unit for { // Produce messages on several partitions @@ -751,127 +770,94 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { client2 <- randomClient _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions)) - _ <- ZIO.foreachDiscard(1 to nrMessages) { i => - produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) - } - // Consume messages - // A map with partition as key, and a messages-received-counter Ref as value: - messagesReceived <- ZIO.foreach(partitionIds)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap) - drainCount <- Ref.make(0) - // Reduce `max.poll.records` and disable pre-fetch. - consumer1Settings <- consumerSettings( - client1, - Some(group), - clientInstanceId = Some("consumer1"), - restartStreamOnRebalancing = true, - `max.poll.records` = 10 - ).map(_.withoutPartitionPreFetching) - subscription = Subscription.topics(topic) - fib <- ZIO - .logAnnotate("consumer", "1") { - Consumer - .partitionedAssignmentStream(subscription, Serde.string, Serde.string) - .rechunk(1) - .mapZIO { partitions => - ZIO.logDebug(s"Got partition assignment ${partitions.map(_._1).mkString(",")}") *> - ZStream - .fromIterable(partitions) - .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => - ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *> - partitionStream.mapChunksZIO { records => - for { - _ <- OffsetBatch(records.map(_.offset)).commit - _ <- messagesReceived(tp.partition).update(_ + records.size) - } yield records - } - } - .runDrain - } - .mapZIO(_ => - drainCount.updateAndGet(_ + 1).flatMap { - case 2 => ZIO.logDebug("Stopping consumption") *> Consumer.stopConsumption - // 1: when consumer on fib2 starts - // 2: when consumer on fib2 stops, end of test - case _ => ZIO.unit - } - ) - .runDrain - .provideSomeLayer[Kafka]( - ZLayer.succeed(consumer1Settings) >>> minimalConsumer() - ) + // Continuously produce messages throughout the test + _ <- ZStream + .fromSchedule(Schedule.fixed(100.millis)) + .mapZIO { i => + ZIO.foreach(partitionIds) { p => + produceMany(topic, p, Seq((s"key.$p.$i", s"msg.$p.$i"))) } - .fork - // fib is running, consuming all the published messages from all partitions. - // Waiting until it recorded all messages - _ <- ZIO - .foreach(messagesReceived.values)(_.get) - .map(_.sum) - .repeat(Schedule.recurUntil((n: Int) => n == nrMessages) && Schedule.fixed(100.millis)) - - // Starting a new consumer that will stop after receiving 20 messages, causing two rebalancing events for fib1 - // consumers on start and stop. - // Reduce `max.poll.records` and disable pre-fetch so that we are sure that consumer 2 does not pre-fetch more - // than it will process. - consumer2Settings <- consumerSettings( - client2, - Some(group), - clientInstanceId = Some("consumer2"), - `max.poll.records` = 10 - ).map(_.withoutPartitionPreFetching) - fib2 <- ZIO - .logAnnotate("consumer", "2") { + } + .runDrain + .forkScoped + + // Consumer 1 + streamsStarted <- Ref.make[Chunk[Set[Int]]](Chunk.empty) + streamsStopped <- Ref.make[Chunk[Int]](Chunk.empty) + consumer1Settings <- + consumerSettings( + client1, + Some(group), + restartStreamOnRebalancing = true + ).map { + _.withProperties( + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName + ) + } + fib1 <- ZIO + .logAnnotate("consumer", "1") { Consumer - .plainStream(subscription, Serde.string, Serde.string) - .take(20) + .partitionedAssignmentStream(Subscription.topics(topic), Serde.string, Serde.string) + .rechunk(1) + .mapZIO { assignments => + ZIO.logDebug(s"Got partition assignment ${assignments.map(_._1).mkString(",")}") *> + streamsStarted.update(_ :+ assignments.map(_._1.partition()).toSet) *> + ZStream + .fromIterable(assignments) + .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => + ZStream.finalizer { + ZIO.logDebug(s"Stream for ${tp.toString} is done") *> + streamsStopped.update(_ :+ tp.partition()) + } *> + partitionStream.mapChunksZIO { records => + OffsetBatch(records.map(_.offset)).commit.as(records) + } + } + .runDrain + } .runDrain .provideSomeLayer[Kafka]( - ZLayer.succeed(consumer2Settings) >>> minimalConsumer() + ZLayer.succeed(consumer1Settings) >>> minimalConsumer() ) } .fork - // Waiting until fib1's partition streams got restarted because of the rebalancing - // Note: on fast computers we may never see `drainCount == 1` but `2` immediately, therefore we need to check - // for `drainCount >= 1`. - _ <- drainCount.get.repeat(Schedule.recurUntil((_: Int) >= 1) && Schedule.fixed(100.millis)) - _ <- ZIO.logDebug("Consumer 1 finished rebalancing") + // Wait until consumer 1 was assigned some partitions + _ <- awaitRebalance(streamsStarted, 1) - // All messages processed, the partition streams of fib are still running. - // Saving the values and resetting the counters - messagesReceived0 <- - ZIO - .foreach(partitionIds) { i => - for { - v <- messagesReceived(i).getAndSet(0) - p <- Ref.make(v).map(i -> _) - } yield p - } - .map(_.toMap) + // Consumer 2 + // Stop after receiving 20 messages, causing two rebalancing events for consumer 1. + consumer2Settings <- consumerSettings(client2, Some(group)) + _ <- ZIO + .logAnnotate("consumer", "2") { + Consumer + .plainStream(Subscription.topics(topic), Serde.string, Serde.string) + .take(20) + .runDrain + .provideSomeLayer[Kafka]( + ZLayer.succeed(consumer2Settings) >>> minimalConsumer() + ) + } + .forkScoped - // Publishing another N messages - now they will be distributed among the two consumers until fib2 stops after - // 20 messages - _ <- ZIO.foreachDiscard((nrMessages + 1) to (2 * nrMessages)) { i => - produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) - } - _ <- fib2.join - _ <- ZIO.logDebug("Consumer 2 done") - // Give consumer 1 some time to consume the rest - _ <- ZIO.sleep(1.second) - _ <- fib.join - _ <- ZIO.logDebug("Consumer 1 done") - // fib2 terminates after 20 messages, fib terminates after fib2 because of the rebalancing (drainCount==2) - messagesPerPartition0 <- - ZIO.foreach(messagesReceived0.values)(_.get) // counts from the first N messages (single consumer) - messagesPerPartition <- - ZIO.foreach(messagesReceived.values)(_.get) // counts from fib after the second consumer joined - - // The first set must contain all the produced messages - // The second set must have at least one and maximum N-20 (because fib2 stops after consuming 20) - - // the exact count cannot be known because fib2's termination triggers fib1's rebalancing asynchronously. - } yield assert(messagesPerPartition0)(forall(equalTo(nrMessages / nrPartitions))) && - assert(messagesPerPartition.view.sum)(isGreaterThan(0) && isLessThanEqualTo(nrMessages - 20)) - } @@ TestAspect.nonFlaky(3), + // Wait until consumer 1's partitions were revoked, and assigned again + _ <- awaitRebalance(streamsStarted, 3) + _ <- fib1.interrupt + + // The started streams after each rebalance + streamsStarted <- streamsStarted.get + _ <- ZIO.logDebug(s"partitions for started streams: $streamsStarted") + + streamsStopped <- streamsStopped.get + _ <- ZIO.logDebug(s"partitions for stopped streams: $streamsStopped") + } yield assertTrue( + // During the first rebalance, all partitions are stopped: + streamsStopped.take(nrPartitions).toSet == partitionIds.toSet, + // Some streams that were assigned at the beginning, are started after the first rebalance: + (streamsStarted(0) intersect streamsStarted(1)).nonEmpty + ) + }, test("handles RebalanceInProgressExceptions transparently") { val nrPartitions = 5 val nrMessages = 10000 From 5b91f5d023af1c01581f8c471a8b003365be045c Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 1 Nov 2023 16:17:16 +0100 Subject: [PATCH 12/14] Better comments --- .../scala/zio/kafka/consumer/ConsumerSpec.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 7284aff22..ee456d1ab 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -739,17 +739,19 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }, test("restartStreamsOnRebalancing mode closes all partition streams") { // Test plan: - // - Throughout the test, continuously produce to all partitions of a topics. - // - Start consumer 1, - // - track which partitions are assigned after each rebalance. - // - track which streams stopped + // - Throughout the test, continuously produce to all partitions of a topic. + // - Start consumer 1: + // - track which partitions are assigned after each rebalance, + // - track which streams stopped. // - Start consumer 2 but finish after just a few records. This results in 2 rebalances for consumer 1. // - Verify that in the first rebalance, consumer 1 ends the streams for _all_ partitions, - // and starts them again. + // and then starts them again. // // NOTE: we need to use the cooperative sticky assignor. The default assignor `ConsumerPartitionAssignor`, - // revokes all partitions and re-assigns them on every rebalance, so the behavior for - // `restartStreamOnRebalancing` is already the default. + // revokes all partitions and re-assigns them on every rebalance. This means that all streams are restarted + // on every rebalance, exactly what `restartStreamOnRebalancing` would have caused. In other words, with the + // default assignor the externally visible behavior is the same, regardless of whether + // `restartStreamOnRebalancing` is `true` or `false`. val nrPartitions = 5 val partitionIds = Chunk.fromIterable(0 until nrPartitions) From b5a1625dfbc67a695a8bac1774e0952b02fd73a1 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 4 Nov 2023 14:03:04 +0100 Subject: [PATCH 13/14] Add documentation --- .../zio/kafka/consumer/internal/Runloop.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index eeba6f566..5b4fe8e58 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -71,6 +71,20 @@ private[consumer] final class Runloop private ( commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private val rebalanceListener: RebalanceListener = { + // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. + // We do not know the order in which the call-back methods are invoked. + // + // Ref `lastRebalanceEvent` is used to track what happens during the poll. Just before the poll the + // `RebalanceEvent.None` is stored. Then during the poll, inside each method of the rebalance listener, + // the ref is updated. + // + // Each method: + // - emits a diagnostic event + // - determines if this is the first method invoked during this poll (`rebalanceEvent.wasInvoked`) to + // make sure that the `restartStreamsOnRebalancing` feature is applied only once per poll + // - ends streams that need to be ended + // - updates `lastRebalanceEvent` + // val recordRebalanceRebalancingListener = RebalanceListener( onAssigned = (assignedTps, _) => for { From e9aa756a87406e5e1f9ec55afb8d21461db5e9a7 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 4 Nov 2023 14:12:55 +0100 Subject: [PATCH 14/14] Ignore request from streams that were ended or lost --- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 5b4fe8e58..09680ec05 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -438,7 +438,12 @@ private[consumer] final class Runloop private ( } cmd match { - case req: RunloopCommand.Request => ZIO.succeed(state.addRequest(req)) + case req: RunloopCommand.Request => + // Ignore request from streams that were ended or lost. + ZIO.succeed( + if (state.assignedStreams.exists(_.tp == req.tp)) state.addRequest(req) + else state + ) case cmd @ RunloopCommand.AddSubscription(newSubscription, _) => state.subscriptionState match { case SubscriptionState.NotSubscribed =>