Skip to content

Commit

Permalink
Await commit during a rebalance
Browse files Browse the repository at this point in the history
Fixes #590 "Many records duplicately processed after rebalancing"

In this change we introduce a new mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed.

### Motivation

Here is a common (single partition) scenario around rebalances:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100)
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100

Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable.

After merging this change, the scenario will unfold as follows:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
   * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
   * inside the onRevoked callback, zio-kafka continues to process commit commands from the user
   * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes
   * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition
1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!)

### Commit queue

Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced.

### Complications

1. The chosen solution is not suitable for all consumers.
   - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one.
   - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places.
1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete.

### Same thread executor

The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time.

### Collateral

This change also:
- fixes order of `private` and `final`
- removes some completely useless tests

### Related

The same issue is present in:
- f2s-kafka: fd4s/fs2-kafka#1200
- alpakka-kafka: akka/alpakka-kafka#1038

In fact, every program that does polls and commits asynchronously is likely affected.

### Non-goals

This change does not try to solve the following goals. However, these can be addressed in future PRs.

- Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087).
- Support consumers that want to commit only a portion of the given messages.
- Support transactional consumer/producer.
- Support external commits.

This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
  • Loading branch information
erikvanoosten committed Nov 5, 2023
1 parent 5fb8b5e commit 77ed9f9
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 84 deletions.
151 changes: 111 additions & 40 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -445,46 +445,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSomeLayer[Kafka](consumer(client, Some(group)))
} yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata)))
},
test("handle rebalancing by completing topic-partition streams") {
val nrMessages = 50
val nrPartitions = 6 // Must be even and strictly positive

for {
// Produce messages on several partitions
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient

_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions))
_ <- ZIO.foreachDiscard(1 to nrMessages) { i =>
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}

// Consume messages
subscription = Subscription.topics(topic)
consumer1 <- Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMapPar(nrPartitions) { case (tp, partition) =>
ZStream
.fromZIO(partition.runDrain)
.as(tp)
}
.take(nrPartitions.toLong / 2)
.runDrain
.provideSomeLayer[Kafka](consumer(client1, Some(group)))
.fork
_ <- Live.live(ZIO.sleep(5.seconds))
consumer2 <- Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.take(nrPartitions.toLong / 2)
.runDrain
.provideSomeLayer[Kafka](consumer(client2, Some(group)))
.fork
_ <- consumer1.join
_ <- consumer2.join
} yield assertCompletes
},
test("produce diagnostic events when rebalancing") {
val nrMessages = 50
val nrPartitions = 6
Expand Down Expand Up @@ -626,6 +586,117 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
consumedMessages <- messagesReceived.get
} yield assert(consumedMessages)(contains(newMessage).negate)
},
suite("rebalanceSafeCommits prevents processing messages twice when rebalancing")({

/**
* Outline of this test:
* - A producer generates some messages on every partition of a topic (2 partitions),
* - A consumer starts reading from the topic. It is the only consumer so it handles all partitions.
* - After a few messages a second consumer is started. One partition will be re-assigned.
*
* Since the first consumer is slow, we expect it to not have committed the offsets yet when the rebalance
* happens. As a consequence, the second consumer would see some messages the first consumer already consumed.
*
* '''However,''' since we enable `rebalanceSafeCommits` on the first consumer, no messages should be consumed
* by both consumers.
*/
def testForPartitionAssignmentStrategy[T <: ConsumerPartitionAssignor: ClassTag] =
test(implicitly[ClassTag[T]].runtimeClass.getName) {
val partitionCount = 2

def makeConsumer(
clientId: String,
groupId: String,
rebalanceSafeCommits: Boolean
): ZLayer[Kafka, Throwable, Consumer] =
ZLayer(
consumerSettings(
clientId = clientId,
groupId = Some(groupId),
properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "1")
).map(_.withRebalanceSafeCommits(rebalanceSafeCommits))
) >>> minimalConsumer()

for {
topic <- randomTopic
subscription = Subscription.topics(topic)
clientId1 <- randomClient
clientId2 <- randomClient
groupId <- randomGroup
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = partitionCount))
// Produce one message to each partition every 500 ms
pFib <- ZStream
.fromSchedule(Schedule.fixed(500.millis))
.mapZIO { i =>
ZIO.foreachDiscard(0 until partitionCount) { p =>
produceMany(topic, p, Seq((s"key-$p-$i", s"msg-$p-$i")))
}
}
.runDrain
.fork
_ <- ZIO.logDebug("Starting consumer 1")
c1Started <- Promise.make[Nothing, Unit]
c1Keys <- Ref.make(Chunk.empty[String])
fib1 <- ZIO
.logAnnotate("consumer", "1") {
// When the stream ends, the topic subscription ends as well. Because of that the consumer
// shuts down and commits are no longer possible. Therefore, we signal the second consumer in
// such a way that it doesn't close the stream.
Consumer
.plainStream(subscription, Serde.string, Serde.string)
.tap(record => ZIO.logDebug(s"Received ${record.key}"))
.tap { record =>
// Signal consumer 2 can start when a record is seen for every partition.
for {
keys <- c1Keys.updateAndGet(_.appended(record.key))
_ <- c1Started.succeed(()).when(keys.map(_.split('-')(1)).toSet.size == partitionCount)
} yield ()
}
// Buffer so that the above can run ahead of the below, this is important;
// we want consumer 2 to start before consumer 1 commits.
.buffer(partitionCount)
.mapZIO { msg =>
for {
_ <- ZIO.sleep(5.seconds)
_ <- ZIO.logDebug(s"Committing offset for key ${msg.key}")
_ <- msg.offset.commit
} yield msg.key
}
.take(partitionCount.toLong)
.runCollect
.map(_.toSet)
.provideSome[Kafka](makeConsumer(clientId1, groupId, true))
}
.fork
_ <- c1Started.await
_ <- ZIO.logDebug("Starting consumer 2")
fib2 <- ZIO
.logAnnotate("consumer", "2") {
Consumer
.plainStream(subscription, Serde.string, Serde.string)
.tap(msg => ZIO.logDebug(s"Received ${msg.key}"))
.mapZIO(msg => msg.offset.commit.as(msg.key))
.take(5)
.runCollect
.map(_.toSet)
.provideSome[Kafka](makeConsumer(clientId2, groupId, false))
}
.fork
_ <- ZIO.logDebug("Waiting for consumers to end")
c2Keys: Set[String] <- fib2.join
_ <- ZIO.logDebug("Consumer 2 ready")
c1Keys: Set[String] <- fib1.join
_ <- ZIO.logDebug("Consumer 1 ready")
_ <- pFib.interrupt
} yield assertTrue((c1Keys intersect c2Keys).isEmpty)
}

// Test for both default partition assignment strategies
Seq(
testForPartitionAssignmentStrategy[RangeAssignor],
testForPartitionAssignmentStrategy[CooperativeStickyAssignor]
)
}: _*),
test("partitions for topic doesn't fail if doesn't exist") {
for {
topic <- randomTopic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
}
)

private def makeCommit(offsets: Map[TopicPartition, Long]): RunloopCommand.Commit = {
private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = {
val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) }
val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None))
RunloopCommand.Commit(o, p)
Runloop.Commit(o, p)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
maxPollInterval: Duration = 5.minutes,
`max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
Expand All @@ -138,6 +139,7 @@ object KafkaTestUtils {
)
.withOffsetRetrieval(offsetRetrieval)
.withRestartStreamOnRebalancing(restartStreamOnRebalancing)
.withRebalanceSafeCommits(rebalanceSafeCommits)
.withProperties(properties)

val withClientInstanceId = clientInstanceId.fold(settings)(settings.withGroupInstanceId)
Expand All @@ -154,6 +156,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty
): URIO[Kafka, ConsumerSettings] =
consumerSettings(
Expand All @@ -163,6 +166,7 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
properties = properties
)
.map(
Expand Down Expand Up @@ -202,6 +206,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
properties: Map[String, String] = Map.empty
): ZLayer[Kafka, Throwable, Consumer] =
Expand All @@ -213,6 +218,7 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
properties = properties,
commitTimeout = commitTimeout
)
Expand All @@ -229,6 +235,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty,
rebalanceListener: RebalanceListener = RebalanceListener.noop
): ZLayer[Kafka, Throwable, Consumer] =
Expand All @@ -240,8 +247,9 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
properties = properties
).map(_.withRebalanceListener(rebalanceListener))
).map(_.withRebalanceListener(rebalanceListener).withRebalanceSafeCommits(rebalanceSafeCommits))
) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ final case class ConsumerSettings(
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
rebalanceListener: RebalanceListener = RebalanceListener.noop,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy()
) {
private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match {
Expand Down Expand Up @@ -154,6 +155,32 @@ final case class ConsumerSettings(
def withRestartStreamOnRebalancing(value: Boolean): ConsumerSettings =
copy(restartStreamOnRebalancing = value)

/**
* @param value
* Whether to hold up a rebalance until all offsets of consumed messages have been committed. The default is
* `false`, but the recommended value is `true` as it prevents duplicate messages.
*
* Use `false` _only_ when your streams don't commit, or when it is okay to have messages processed twice (possibly
* concurrently).
*
* When `true`, messages consumed from revoked partitions must be committed before we allow the rebalance to continue.
*
* When a partition is revoked, consuming the messages will be taken over by another consumer. The other consumer will
* continue from the committed offset. It is therefore important that this consumer commits offsets of all consumed
* messages. Therefore, by holding up the rebalance until these commits are done, we ensure that the new consumer will
* start from the correct offset.
*
* During a rebalance no new messages can be received _for any stream_. Therefore, _all_ streams are deprived of new
* messages until the revoked streams are ready committing.
*
* When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any
* offset commits from these streams have a high chance of being delayed (commits are not possible during some phases
* of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will
* start from an earlier offset. The result is that some messages are processed twice and concurrently.
*/
def withRebalanceSafeCommits(value: Boolean): ConsumerSettings =
copy(rebalanceSafeCommits = value)

def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings =
withProperties(credentialsStore.properties)

Expand Down Expand Up @@ -200,6 +227,6 @@ final case class ConsumerSettings(
object ConsumerSettings {
val defaultCommitTimeout: Duration = 15.seconds

def apply(bootstrapServers: List[String]) =
def apply(bootstrapServers: List[String]): ConsumerSettings =
new ConsumerSettings().withBootstrapServers(bootstrapServers)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import scala.jdk.CollectionConverters._

/**
* ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects.
*
* Note that the given ZIO effects are executed directly on the Kafka poll thread. Fork and shift to another executor
* when this is not desired.
*/
final case class RebalanceListener(
onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ private[consumer] final class ConsumerAccess(
def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
access.withPermit(withConsumerNoPermit(f))

private[consumer] def withConsumerNoPermit[R, A](
private def withConsumerNoPermit[R, A](
f: ByteArrayKafkaConsumer => RIO[R, A]
): RIO[R, A] =
ZIO
Expand All @@ -31,10 +31,17 @@ private[consumer] final class ConsumerAccess(
.flatMap(fib => fib.join.onInterrupt(ZIO.succeed(consumer.wakeup()) *> fib.interrupt))

/**
* Do not use this method outside of the Runloop
* Use this method only from Runloop.
*/
private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] =
access.withPermit(f(consumer))

/**
* Use this method ONLY from the rebalance listener.
*/
private[internal] def rebalanceListenerAccess[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
withConsumerNoPermit(f)

}

private[consumer] object ConsumerAccess {
Expand Down
Loading

0 comments on commit 77ed9f9

Please sign in to comment.