Skip to content

Commit

Permalink
Retry RebalanceInProgressException in CommitRecovery.Default (#1312)
Browse files Browse the repository at this point in the history
  • Loading branch information
aartigao authored Apr 2, 2024
1 parent 0f37aa7 commit 4e47aaf
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 41 deletions.
14 changes: 8 additions & 6 deletions modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import cats.syntax.functor.*
import cats.Functor

import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RetriableCommitFailedException}
import org.apache.kafka.common.errors.RebalanceInProgressException
import org.apache.kafka.common.TopicPartition

/**
Expand Down Expand Up @@ -51,18 +52,19 @@ object CommitRecovery {

/**
* The default [[CommitRecovery]] used in [[ConsumerSettings]] unless a different one has been
* specified. The default recovery strategy only retries `RetriableCommitFailedException`s. These
* exceptions are retried with a jittered exponential backoff, where the time in milliseconds
* before retrying is calculated using:
* specified. The default recovery strategy only retries `RetriableCommitFailedException`s and
* `RebalanceInProgressException`s. These exceptions are retried with a jittered exponential
* backoff, where the time in milliseconds before retrying is calculated using:
*
* {{{
* Random.nextDouble() * Math.min(10000, 10 * Math.pow(2, n))
* }}}
*
* where `n` is the retry attempt (first attempt is `n = 1`). This is done for up to 10 attempts,
* after which we change to retry using a fixed time of 10 seconds, for up to another 5 attempts.
* If at that point we are still faced with `RetriableCommitFailedException`, we give up and
* raise a [[CommitRecoveryException]] with the last such error experienced.<br><br>
* If at that point we are still faced with `RetriableCommitFailedException` or
* `RebalanceInProgressException`, we give up and raise a [[CommitRecoveryException]] with the
* last such error experienced.<br><br>
*
* The sum of time spent waiting between retries will always be less than 70 220 milliseconds, or
* ~70 seconds. Note that this does not include the time for attempting to commit offsets. Offset
Expand All @@ -87,7 +89,7 @@ object CommitRecovery {
jitter: Jitter[F]
): Throwable => F[Unit] = {
def retry(attempt: Int): Throwable => F[Unit] = {
case retriable: RetriableCommitFailedException =>
case retriable @ (_: RetriableCommitFailedException | _: RebalanceInProgressException) =>
val commitWithRecovery = commit.handleErrorWith(retry(attempt + 1))
if (attempt <= 10) backoff(attempt).flatMap(F.sleep) >> commitWithRecovery
else if (attempt <= 15) F.sleep(10.seconds) >> commitWithRecovery
Expand Down
75 changes: 40 additions & 35 deletions modules/core/src/test/scala/fs2/kafka/CommitRecoverySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,55 @@ import cats.effect.unsafe.implicits.global
import cats.syntax.functor.*

import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RetriableCommitFailedException}
import org.apache.kafka.common.errors.RebalanceInProgressException
import org.apache.kafka.common.TopicPartition

final class CommitRecoverySpec extends BaseAsyncSpec {

describe("CommitRecovery#Default") {
it("should retry with jittered exponential backoff and fixed rate") {
val (result: Either[Throwable, Unit], sleeps: Chain[FiniteDuration]) =
Ref
.of[IO, Chain[FiniteDuration]](Chain.empty)
.flatMap { ref =>
implicit val temporal: Temporal[IO] = storeSleepsTemporal(ref)
val commit: IO[Unit] = IO.raiseError(new RetriableCommitFailedException("retriable"))
val offsets = Map(new TopicPartition("topic", 0) -> new OffsetAndMetadata(1))
val recovery = CommitRecovery.Default.recoverCommitWith(offsets, commit)
val attempted = commit.handleErrorWith(recovery).attempt
attempted.flatMap(ref.get.tupleLeft)
}
.unsafeRunSync()

assert {
result
.left
.toOption
.map(_.toString)
.contains {
"fs2.kafka.CommitRecoveryException: offset commit is still failing after 15 attempts for offsets: topic-0 -> 1; last exception was: org.apache.kafka.clients.consumer.RetriableCommitFailedException: retriable"
}
}

assert(sleeps.size == 15L)
List(new RetriableCommitFailedException("retriable"), new RebalanceInProgressException())
.foreach { ex =>
it(s"should retry $ex with jittered exponential backoff and fixed rate") {
val (result: Either[Throwable, Unit], sleeps: Chain[FiniteDuration]) =
Ref
.of[IO, Chain[FiniteDuration]](Chain.empty)
.flatMap { ref =>
implicit val temporal: Temporal[IO] = storeSleepsTemporal(ref)
val commit: IO[Unit] = IO.raiseError(ex)
val offsets = Map(new TopicPartition("topic", 0) -> new OffsetAndMetadata(1))
val recovery = CommitRecovery.Default.recoverCommitWith(offsets, commit)
val attempted = commit.handleErrorWith(recovery).attempt
attempted.flatMap(ref.get.tupleLeft)
}
.unsafeRunSync()

assert {
result
.left
.toOption
.map(_.toString)
.contains {
s"fs2.kafka.CommitRecoveryException: offset commit is still failing after 15 attempts for offsets: topic-0 -> 1; last exception was: $ex"
}
}

assert {
sleeps
.toList
.take(10)
.zipWithIndex
.forall { case (sleep, attempt) =>
val max = 10 * Math.pow(2, attempt.toDouble + 1)
0 <= sleep.toMillis && sleep.toMillis < max
assert(sleeps.size == 15L)

assert {
sleeps
.toList
.take(10)
.zipWithIndex
.forall { case (sleep, attempt) =>
val max = 10 * Math.pow(2, attempt.toDouble + 1)
0 <= sleep.toMillis && sleep.toMillis < max
}
}
}

assert(sleeps.toList.drop(10).forall(_.toMillis == 10000L))
}
assert(sleeps.toList.drop(10).forall(_.toMillis == 10000L))
}
}

it("should not recover non-retriable exceptions") {
val commit: IO[Unit] = IO.raiseError(new RuntimeException("commit"))
Expand Down

0 comments on commit 4e47aaf

Please sign in to comment.