diff --git a/modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala b/modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala index e50adcc36..9609f4f51 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala @@ -15,6 +15,7 @@ import cats.syntax.functor.* import cats.Functor import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RetriableCommitFailedException} +import org.apache.kafka.common.errors.RebalanceInProgressException import org.apache.kafka.common.TopicPartition /** @@ -51,9 +52,9 @@ object CommitRecovery { /** * The default [[CommitRecovery]] used in [[ConsumerSettings]] unless a different one has been - * specified. The default recovery strategy only retries `RetriableCommitFailedException`s. These - * exceptions are retried with a jittered exponential backoff, where the time in milliseconds - * before retrying is calculated using: + * specified. The default recovery strategy only retries `RetriableCommitFailedException`s and + * `RebalanceInProgressException`s. These exceptions are retried with a jittered exponential + * backoff, where the time in milliseconds before retrying is calculated using: * * {{{ * Random.nextDouble() * Math.min(10000, 10 * Math.pow(2, n)) @@ -61,8 +62,9 @@ object CommitRecovery { * * where `n` is the retry attempt (first attempt is `n = 1`). This is done for up to 10 attempts, * after which we change to retry using a fixed time of 10 seconds, for up to another 5 attempts. - * If at that point we are still faced with `RetriableCommitFailedException`, we give up and - * raise a [[CommitRecoveryException]] with the last such error experienced.

+ * If at that point we are still faced with `RetriableCommitFailedException` or + * `RebalanceInProgressException`, we give up and raise a [[CommitRecoveryException]] with the + * last such error experienced.

* * The sum of time spent waiting between retries will always be less than 70 220 milliseconds, or * ~70 seconds. Note that this does not include the time for attempting to commit offsets. Offset @@ -87,7 +89,7 @@ object CommitRecovery { jitter: Jitter[F] ): Throwable => F[Unit] = { def retry(attempt: Int): Throwable => F[Unit] = { - case retriable: RetriableCommitFailedException => + case retriable @ (_: RetriableCommitFailedException | _: RebalanceInProgressException) => val commitWithRecovery = commit.handleErrorWith(retry(attempt + 1)) if (attempt <= 10) backoff(attempt).flatMap(F.sleep) >> commitWithRecovery else if (attempt <= 15) F.sleep(10.seconds) >> commitWithRecovery diff --git a/modules/core/src/test/scala/fs2/kafka/CommitRecoverySpec.scala b/modules/core/src/test/scala/fs2/kafka/CommitRecoverySpec.scala index eb43d4857..200779ef3 100644 --- a/modules/core/src/test/scala/fs2/kafka/CommitRecoverySpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/CommitRecoverySpec.scala @@ -15,50 +15,55 @@ import cats.effect.unsafe.implicits.global import cats.syntax.functor.* import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RetriableCommitFailedException} +import org.apache.kafka.common.errors.RebalanceInProgressException import org.apache.kafka.common.TopicPartition final class CommitRecoverySpec extends BaseAsyncSpec { describe("CommitRecovery#Default") { - it("should retry with jittered exponential backoff and fixed rate") { - val (result: Either[Throwable, Unit], sleeps: Chain[FiniteDuration]) = - Ref - .of[IO, Chain[FiniteDuration]](Chain.empty) - .flatMap { ref => - implicit val temporal: Temporal[IO] = storeSleepsTemporal(ref) - val commit: IO[Unit] = IO.raiseError(new RetriableCommitFailedException("retriable")) - val offsets = Map(new TopicPartition("topic", 0) -> new OffsetAndMetadata(1)) - val recovery = CommitRecovery.Default.recoverCommitWith(offsets, commit) - val attempted = commit.handleErrorWith(recovery).attempt - attempted.flatMap(ref.get.tupleLeft) - } - .unsafeRunSync() - - assert { - result - .left - .toOption - .map(_.toString) - .contains { - "fs2.kafka.CommitRecoveryException: offset commit is still failing after 15 attempts for offsets: topic-0 -> 1; last exception was: org.apache.kafka.clients.consumer.RetriableCommitFailedException: retriable" - } - } - assert(sleeps.size == 15L) + List(new RetriableCommitFailedException("retriable"), new RebalanceInProgressException()) + .foreach { ex => + it(s"should retry $ex with jittered exponential backoff and fixed rate") { + val (result: Either[Throwable, Unit], sleeps: Chain[FiniteDuration]) = + Ref + .of[IO, Chain[FiniteDuration]](Chain.empty) + .flatMap { ref => + implicit val temporal: Temporal[IO] = storeSleepsTemporal(ref) + val commit: IO[Unit] = IO.raiseError(ex) + val offsets = Map(new TopicPartition("topic", 0) -> new OffsetAndMetadata(1)) + val recovery = CommitRecovery.Default.recoverCommitWith(offsets, commit) + val attempted = commit.handleErrorWith(recovery).attempt + attempted.flatMap(ref.get.tupleLeft) + } + .unsafeRunSync() + + assert { + result + .left + .toOption + .map(_.toString) + .contains { + s"fs2.kafka.CommitRecoveryException: offset commit is still failing after 15 attempts for offsets: topic-0 -> 1; last exception was: $ex" + } + } - assert { - sleeps - .toList - .take(10) - .zipWithIndex - .forall { case (sleep, attempt) => - val max = 10 * Math.pow(2, attempt.toDouble + 1) - 0 <= sleep.toMillis && sleep.toMillis < max + assert(sleeps.size == 15L) + + assert { + sleeps + .toList + .take(10) + .zipWithIndex + .forall { case (sleep, attempt) => + val max = 10 * Math.pow(2, attempt.toDouble + 1) + 0 <= sleep.toMillis && sleep.toMillis < max + } } - } - assert(sleeps.toList.drop(10).forall(_.toMillis == 10000L)) - } + assert(sleeps.toList.drop(10).forall(_.toMillis == 10000L)) + } + } it("should not recover non-retriable exceptions") { val commit: IO[Unit] = IO.raiseError(new RuntimeException("commit"))