From f6784a94049b3de337575ddf0651218c6e9a1299 Mon Sep 17 00:00:00 2001 From: Ben Stewart Date: Tue, 2 Apr 2024 07:51:39 +0100 Subject: [PATCH] Change `Blocking` to use `Sync#interruptible`, not `blocking` (#1126) This is to avoid being unable to cancel code that is inside blocking section. At the moment it is possible for user-code to not cancel immediately, in which case it will block until an internal Kafka max-block period is reached (by default 1 minute). Changing to use `interruptible` means the user-code returns immediately on cancellation, and the underlying operation in Kafka should be cancelled. --- modules/core/src/main/scala/fs2/kafka/internal/Blocking.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/Blocking.scala b/modules/core/src/main/scala/fs2/kafka/internal/Blocking.scala index 292e3aac8..fb87f6ffa 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/Blocking.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/Blocking.scala @@ -20,7 +20,7 @@ private[kafka] trait Blocking[F[_]] { private[kafka] object Blocking { def fromSync[F[_]: Sync]: Blocking[F] = new Blocking[F] { - override def apply[A](a: => A): F[A] = Sync[F].blocking(a) + override def apply[A](a: => A): F[A] = Sync[F].interruptible(a) } def fromExecutionContext[F[_]](ec: ExecutionContext)(implicit F: Async[F]): Blocking[F] =