Skip to content

Commit

Permalink
Automatically merge commits
Browse files Browse the repository at this point in the history
The commit throughput that is supported by Kafka brokers is much lower than the consume throughput. Therefore, to retain a high consume throughput it is important that not every record's offset is committed. In this PR we automatically merge all commits that were generated in the course of a single run of the runloop. This frees users from having to merge streams and do the commit merging themselves.

Commits are placed in a separate queue. Although this is not really needed for this change, we do need this in the next PR, where we start handling commits from the rebalance listener (see #830).
  • Loading branch information
erikvanoosten committed Oct 11, 2023
1 parent f068456 commit 042a4c4
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 51 deletions.
137 changes: 91 additions & 46 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.stream._

import java.util
import java.util.{ Map => JavaMap }
import scala.collection.mutable
import scala.jdk.CollectionConverters._

//noinspection SimplifyWhenInspection,SimplifyUnlessInspection
Expand All @@ -25,6 +27,7 @@ private[consumer] final class Runloop private (
pollTimeout: Duration,
commitTimeout: Duration,
runloopTimeout: Duration,
commitQueue: Queue[Runloop.Commit],
commandQueue: Queue[RunloopCommand],
lastRebalanceEvent: Ref.Synchronized[Option[Runloop.RebalanceEvent]],
partitionsHub: Hub[Take[Throwable, PartitionAssignment]],
Expand Down Expand Up @@ -110,43 +113,17 @@ private[consumer] final class Runloop private (
}
}

/** This is the implementation behind the user facing api `Offset.commit`. */
private val commit: Map[TopicPartition, Long] => Task[Unit] =
offsets =>
for {
p <- Promise.make[Throwable, Unit]
_ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit
_ <- commitQueue.offer(Runloop.Commit(offsets, p))
_ <- commandQueue.offer(RunloopCommand.CommitAvailable)
_ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets))
_ <- p.await.timeoutFail(CommitTimeout)(commitTimeout)
} yield ()

private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = {
val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => cmd.cont.done(e).asInstanceOf[UIO[Unit]]
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsets))
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *>
commandQueue.offer(cmd).unit
case err =>
cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err))
}
val callback =
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit =
Unsafe.unsafe { implicit u =>
runtime.unsafe.run(if (exception eq null) onSuccess else onFailure(exception)).getOrThrowFiberFailure()
}
}

// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
consumer.runloopAccess { c =>
ZIO
.attempt(c.commitAsync(offsets.asJava, callback))
.catchAll(onFailure)
}
}

/**
* Does all needed to end revoked partitions:
* 1. Complete the revoked assigned streams 2. Remove from the list of pending requests
Expand Down Expand Up @@ -235,7 +212,9 @@ private[consumer] final class Runloop private (
.as(tps)
}

// Pause partitions for which there is no demand and resume those for which there is now demand
/**
* Pause partitions for which there is no demand and resume those for which there is now demand.
*/
private def resumeAndPausePartitions(
c: ByteArrayKafkaConsumer,
assignment: Set[TopicPartition],
Expand Down Expand Up @@ -337,8 +316,9 @@ private[consumer] final class Runloop private (
}
}
startingStreams <-
if (pollResult.startingTps.isEmpty) ZIO.succeed(Chunk.empty[PartitionStreamControl])
else {
if (pollResult.startingTps.isEmpty) {
ZIO.succeed(Chunk.empty[PartitionStreamControl])
} else {
ZIO
.foreach(Chunk.fromIterable(pollResult.startingTps))(newPartitionStream)
.tap { newStreams =>
Expand All @@ -361,6 +341,59 @@ private[consumer] final class Runloop private (
assignedStreams = updatedStreams
)

private def handleCommits(state: State, commits: Chunk[Runloop.Commit]): UIO[State] =
if (commits.isEmpty) {
ZIO.succeed(state)
} else {
val (offsets, callback, onFailure) = asyncCommitParameters(commits)
val newState = state.addCommits(commits)
consumer.runloopAccess { c =>
// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
ZIO.attempt(c.commitAsync(offsets, callback))
}
.catchAll(onFailure)
.as(newState)
}

/** Merge commits and prepare parameters for calling `consumer.commitAsync`. */
private def asyncCommitParameters(
commits: Chunk[Runloop.Commit]
): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = {
val offsets = commits
.foldLeft(mutable.Map.empty[TopicPartition, Long]) { case (acc, commit) =>
commit.offsets.foreach { case (tp, offset) =>
acc += (tp -> acc.get(tp).map(_ max offset).getOrElse(offset))
}
acc
}
.toMap
val offsetsWithMetaData = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e))
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
for {
_ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried")
_ <- commitQueue.offerAll(commits)
_ <- commandQueue.offer(RunloopCommand.CommitAvailable)
} yield ()
case err: Throwable =>
cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err))
}
val callback =
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit =
Unsafe.unsafe { implicit u =>
runtime.unsafe.run {
if (exception eq null) onSuccess else onFailure(exception)
}
.getOrThrowFiberFailure()
}
}
(offsetsWithMetaData.asJava, callback, onFailure)
}

private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): Task[State] = {
def doChangeSubscription(newSubscriptionState: SubscriptionState): Task[State] =
applyNewSubscriptionState(newSubscriptionState).flatMap { newAssignedStreams =>
Expand All @@ -385,7 +418,6 @@ private[consumer] final class Runloop private (

cmd match {
case req: RunloopCommand.Request => ZIO.succeed(state.addRequest(req))
case cmd: RunloopCommand.Commit => doCommit(cmd).as(state.addCommit(cmd))
case cmd @ RunloopCommand.AddSubscription(newSubscription, _) =>
state.subscriptionState match {
case SubscriptionState.NotSubscribed =>
Expand Down Expand Up @@ -468,15 +500,15 @@ private[consumer] final class Runloop private (

/**
* Poll behavior:
* - Run until stop is set to true
* - Process commands as soon as they are queued, unless in the middle of polling
* - Process all currently queued commands before polling instead of one by one
* - Immediately after polling, if there are available commands, process them instead of waiting until some periodic
* trigger
* - Poll only when subscribed (leads to exceptions from the Apache Kafka Consumer if not)
* - Poll continuously when there are (still) unfulfilled requests or pending commits
* - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after
* initialization and rebalancing
* - Run until the StopRunloop command is received
* - Process all currently queued Commits
* - Process all currently queued commands
* - Poll the Kafka broker
* - After command handling and after polling, determine whether the runloop should continue:
* - Poll only when subscribed (leads to exceptions from the Apache Kafka Consumer if not)
* - Poll continuously when there are (still) unfulfilled requests or pending commits
* - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after
* initialization and rebalancing
*/
private def run(initialState: State): ZIO[Scope, Throwable, Any] = {
import Runloop.StreamOps
Expand All @@ -487,9 +519,11 @@ private[consumer] final class Runloop private (
.takeWhile(_ != RunloopCommand.StopRunloop)
.runFoldChunksDiscardZIO(initialState) { (state, commands) =>
for {
_ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}")
commits <- commitQueue.takeAll
_ <- ZIO.logDebug(s"Processing ${commits.size} commits, ${commands.size} commands: ${commands.mkString(",")}")
stateAfterCommits <- handleCommits(state, commits)
streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd }
stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand)
stateAfterCommands <- ZIO.foldLeft(streamCommands)(stateAfterCommits)(handleCommand)

updatedStateAfterPoll <- if (stateAfterCommands.shouldPoll) handlePoll(stateAfterCommands)
else ZIO.succeed(stateAfterCommands)
Expand Down Expand Up @@ -550,6 +584,15 @@ private[consumer] object Runloop {
) extends RebalanceEvent
}

// TODO: make `private` as soon as `State` has been moved into `Runloop`, see https://github.com/zio/zio-kafka/pull/1065
final case class Commit(
offsets: Map[TopicPartition, Long],
cont: Promise[Throwable, Unit]
) {
@inline def isDone: UIO[Boolean] = cont.isDone
@inline def isPending: UIO[Boolean] = isDone.negate
}

def make(
hasGroupId: Boolean,
consumer: ConsumerAccess,
Expand All @@ -565,6 +608,7 @@ private[consumer] object Runloop {
): URIO[Scope, Runloop] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized))
commitQueue <- ZIO.acquireRelease(Queue.unbounded[Runloop.Commit])(_.shutdown)
commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown)
lastRebalanceEvent <- Ref.Synchronized.make[Option[Runloop.RebalanceEvent]](None)
initialState = State.initial
Expand All @@ -577,6 +621,7 @@ private[consumer] object Runloop {
pollTimeout = pollTimeout,
commitTimeout = commitTimeout,
runloopTimeout = runloopTimeout,
commitQueue = commitQueue,
commandQueue = commandQueue,
lastRebalanceEvent = lastRebalanceEvent,
partitionsHub = partitionsHub,
Expand Down Expand Up @@ -608,8 +653,8 @@ private[consumer] object Runloop {
assignedStreams: Chunk[PartitionStreamControl],
subscriptionState: SubscriptionState
) {
def addCommit(c: RunloopCommand.Commit): State = copy(pendingCommits = pendingCommits :+ c)
def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r)
def addCommits(c: Chunk[RunloopCommand.Commit]): State = copy(pendingCommits = pendingCommits ++ c)
def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r)

def shouldPoll: Boolean =
subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ object RunloopCommand {
/** Used as a signal that another poll is needed. */
case object Poll extends Control

/** Used as a signal to the poll-loop that commits are available in the commit-queue. */
case object CommitAvailable extends Control

case object StopRunloop extends Control
case object StopAllStreams extends StreamCommand

final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends StreamCommand {
@inline def isDone: UIO[Boolean] = cont.isDone
@inline def isPending: UIO[Boolean] = isDone.negate
}

/** Used by a stream to request more records. */
final case class Request(tp: TopicPartition) extends StreamCommand

Expand Down

0 comments on commit 042a4c4

Please sign in to comment.