Skip to content

Commit

Permalink
Change Blocking to use Sync#interruptible, not blocking (#1126)
Browse files Browse the repository at this point in the history
This is to avoid being unable to cancel code that is inside blocking
section. At the moment it is possible for user-code to not cancel
immediately, in which case it will block until an internal Kafka
max-block period is reached (by default 1 minute).

Changing to use `interruptible` means the user-code returns immediately
on cancellation, and the underlying operation in Kafka should be
cancelled.
  • Loading branch information
bastewart authored Apr 2, 2024
1 parent 4e47aaf commit f6784a9
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ private[kafka] trait Blocking[F[_]] {
private[kafka] object Blocking {

def fromSync[F[_]: Sync]: Blocking[F] = new Blocking[F] {
override def apply[A](a: => A): F[A] = Sync[F].blocking(a)
override def apply[A](a: => A): F[A] = Sync[F].interruptible(a)
}

def fromExecutionContext[F[_]](ec: ExecutionContext)(implicit F: Async[F]): Blocking[F] =
Expand Down

0 comments on commit f6784a9

Please sign in to comment.