Skip to content

Commit

Permalink
Blocker for all IO operations (#417)
Browse files Browse the repository at this point in the history
  • Loading branch information
wookievx authored Oct 14, 2020
1 parent eddc326 commit 6b30c3c
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@

package dev.profunktor.fs2rabbit.algebra

import cats.effect.Sync
import cats.effect.{Blocker, ContextShift, Sync}
import cats.syntax.functor._
import dev.profunktor.fs2rabbit.arguments._
import dev.profunktor.fs2rabbit.model._

object Binding {
def make[F[_]: Sync]: Binding[F] =
def make[F[_]: Sync: ContextShift](blocker: Blocker): Binding[F] =
new Binding[F] {
override def bindQueue(channel: AMQPChannel,
queueName: QueueName,
exchangeName: ExchangeName,
routingKey: RoutingKey): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.queueBind(
queueName.value,
exchangeName.value,
Expand All @@ -41,7 +41,7 @@ object Binding {
exchangeName: ExchangeName,
routingKey: RoutingKey,
args: QueueBindingArgs): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.queueBind(
queueName.value,
exchangeName.value,
Expand All @@ -55,7 +55,7 @@ object Binding {
exchangeName: ExchangeName,
routingKey: RoutingKey,
args: QueueBindingArgs): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.queueBindNoWait(
queueName.value,
exchangeName.value,
Expand Down Expand Up @@ -83,7 +83,7 @@ object Binding {
exchangeName: ExchangeName,
routingKey: RoutingKey,
args: QueueUnbindArgs): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.queueUnbind(
queueName.value,
exchangeName.value,
Expand All @@ -97,7 +97,7 @@ object Binding {
source: ExchangeName,
routingKey: RoutingKey,
args: ExchangeBindingArgs): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.exchangeBind(
destination.value,
source.value,
Expand All @@ -111,7 +111,7 @@ object Binding {
source: ExchangeName,
routingKey: RoutingKey,
args: ExchangeBindingArgs): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.exchangeBindNoWait(
destination.value,
source.value,
Expand All @@ -125,7 +125,7 @@ object Binding {
source: ExchangeName,
routingKey: RoutingKey,
args: ExchangeUnbindArgs): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.exchangeUnbind(
destination.value,
source.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package dev.profunktor.fs2rabbit.algebra

import cats.effect.syntax.effect._
import cats.effect.{Effect, Sync}
import cats.effect.{Blocker, ContextShift, Effect, Sync}
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.{Applicative, Functor}
Expand All @@ -28,7 +28,7 @@ import dev.profunktor.fs2rabbit.model._
import scala.util.{Failure, Success, Try}

object Consume {
def make[F[_]: Effect]: Consume[F] =
def make[F[_]: Effect: ContextShift](blocker: Blocker): Consume[F] =
new Consume[F] {
private[fs2rabbit] def defaultConsumer[A](
channel: AMQPChannel,
Expand Down Expand Up @@ -108,21 +108,21 @@ object Consume {
}
}

override def basicAck(channel: AMQPChannel, tag: DeliveryTag, multiple: Boolean): F[Unit] = Sync[F].delay {
override def basicAck(channel: AMQPChannel, tag: DeliveryTag, multiple: Boolean): F[Unit] = blocker.delay {
channel.value.basicAck(tag.value, multiple)
}

override def basicNack(channel: AMQPChannel, tag: DeliveryTag, multiple: Boolean, requeue: Boolean): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.basicNack(tag.value, multiple, requeue)
}

override def basicReject(channel: AMQPChannel, tag: DeliveryTag, requeue: Boolean): F[Unit] = Sync[F].delay {
override def basicReject(channel: AMQPChannel, tag: DeliveryTag, requeue: Boolean): F[Unit] = blocker.delay {
channel.value.basicReject(tag.value, requeue)
}

override def basicQos(channel: AMQPChannel, basicQos: BasicQos): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.basicQos(
basicQos.prefetchSize,
basicQos.prefetchCount,
Expand All @@ -141,7 +141,7 @@ object Consume {
)(internals: AMQPInternals[F]): F[ConsumerTag] =
for {
dc <- defaultConsumer(channel, internals)
rs <- Sync[F].delay(
rs <- blocker.delay(
channel.value.basicConsume(
queueName.value,
autoAck,
Expand All @@ -155,7 +155,7 @@ object Consume {
} yield ConsumerTag(rs)

override def basicCancel(channel: AMQPChannel, consumerTag: ConsumerTag): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.basicCancel(consumerTag.value)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

package dev.profunktor.fs2rabbit.algebra

import cats.effect.Sync
import cats.effect.{Blocker, ContextShift, Sync}
import cats.syntax.functor._
import dev.profunktor.fs2rabbit.arguments._
import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig}
import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._
import dev.profunktor.fs2rabbit.model.{AMQPChannel, ExchangeName, QueueName}

object Declaration {
def make[F[_]: Sync]: Declaration[F] = new Declaration[F] {
def make[F[_]: Sync: ContextShift](blocker: Blocker): Declaration[F] = new Declaration[F] {
override def declareExchange(channel: AMQPChannel, config: DeclarationExchangeConfig): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.exchangeDeclare(
config.exchangeName.value,
config.exchangeType.toString.toLowerCase,
Expand All @@ -41,7 +41,7 @@ object Declaration {
channel: AMQPChannel,
config: DeclarationExchangeConfig
): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.exchangeDeclareNoWait(
config.exchangeName.value,
config.exchangeType.toString.toLowerCase,
Expand All @@ -53,17 +53,17 @@ object Declaration {
}.void

override def declareExchangePassive(channel: AMQPChannel, exchangeName: ExchangeName): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.exchangeDeclarePassive(exchangeName.value)
}.void

override def declareQueue(channel: AMQPChannel): F[QueueName] =
Sync[F].delay {
blocker.delay {
QueueName(channel.value.queueDeclare().getQueue)
}

override def declareQueue(channel: AMQPChannel, config: DeclarationQueueConfig): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.queueDeclare(
config.queueName.value,
config.durable.isTrue,
Expand All @@ -74,7 +74,7 @@ object Declaration {
}.void

override def declareQueueNoWait(channel: AMQPChannel, config: DeclarationQueueConfig): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.queueDeclareNoWait(
config.queueName.value,
config.durable.isTrue,
Expand All @@ -85,7 +85,7 @@ object Declaration {
}.void

override def declareQueuePassive(channel: AMQPChannel, queueName: QueueName): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.queueDeclarePassive(queueName.value)
}.void
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

package dev.profunktor.fs2rabbit.algebra

import cats.effect.Sync
import cats.effect.{Blocker, ContextShift, Sync}
import cats.syntax.functor._
import dev.profunktor.fs2rabbit.config.deletion
import dev.profunktor.fs2rabbit.config.deletion.{DeletionExchangeConfig, DeletionQueueConfig}
import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._
import dev.profunktor.fs2rabbit.model.AMQPChannel

object Deletion {
def make[F[_]: Sync]: Deletion[F] = new Deletion[F] {
def make[F[_]: Sync: ContextShift](blocker: Blocker): Deletion[F] = new Deletion[F] {
override def deleteQueue(channel: AMQPChannel, config: DeletionQueueConfig): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.queueDelete(
config.queueName.value,
config.ifUnused.isTrue,
Expand All @@ -35,7 +35,7 @@ object Deletion {
}.void

override def deleteQueueNoWait(channel: AMQPChannel, config: DeletionQueueConfig): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.queueDeleteNoWait(
config.queueName.value,
config.ifUnused.isTrue,
Expand All @@ -47,15 +47,15 @@ object Deletion {
channel: AMQPChannel,
config: deletion.DeletionExchangeConfig
): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.exchangeDelete(config.exchangeName.value, config.ifUnused.isTrue)
}.void

override def deleteExchangeNoWait(
channel: AMQPChannel,
config: deletion.DeletionExchangeConfig
): F[Unit] =
Sync[F].delay {
blocker.delay {
channel.value.exchangeDeleteNoWait(
config.exchangeName.value,
config.ifUnused.isTrue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ object RabbitClient {

val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500))
val connection = ConnectionResource.make(config, sslContext, saslConfig, metricsCollector)
val consumingProgram = AckConsumingProgram.make[F](config, internalQ)
val consumingProgram = AckConsumingProgram.make[F](config, internalQ, blocker)
val publishingProgram = PublishingProgram.make[F](blocker)

(connection, consumingProgram, publishingProgram).mapN {
case (conn, consuming, publish) =>
val consumeClient = Consume.make[F]
val consumeClient = Consume.make[F](blocker)
val publishClient = Publish.make[F](blocker)
val bindingClient = Binding.make[F]
val declarationClient = Declaration.make[F]
val deletionClient = Deletion.make[F]
val bindingClient = Binding.make[F](blocker)
val declarationClient = Declaration.make[F](blocker)
val deletionClient = Deletion.make[F](blocker)

new RabbitClient[F](
conn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import dev.profunktor.fs2rabbit.model._
import fs2.Stream

object AckConsumingProgram {
def make[F[_]: Effect](configuration: Fs2RabbitConfig, internalQueue: InternalQueue[F]): F[AckConsumingProgram[F]] =
(AckingProgram.make(configuration), ConsumingProgram.make(internalQueue)).mapN {
def make[F[_]: Effect: ContextShift](configuration: Fs2RabbitConfig,
internalQueue: InternalQueue[F],
blocker: Blocker): F[AckConsumingProgram[F]] =
(AckingProgram.make(configuration, blocker), ConsumingProgram.make(internalQueue, blocker)).mapN {
case (ap, cp) =>
WrapperAckConsumingProgram(ap, cp)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
package dev.profunktor.fs2rabbit.program

import cats.Applicative
import cats.effect.{Effect, Sync}
import cats.effect.{Blocker, ContextShift, Effect, Sync}
import dev.profunktor.fs2rabbit.algebra.{AMQPInternals, Acking, Consume}
import dev.profunktor.fs2rabbit.arguments.Arguments
import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig
import dev.profunktor.fs2rabbit.model.AckResult.{Ack, NAck, Reject}
import dev.profunktor.fs2rabbit.model._

object AckingProgram {
def make[F[_]: Effect](config: Fs2RabbitConfig): F[AckingProgram[F]] = Sync[F].delay {
WrapperAckingProgram(config, Consume.make)
def make[F[_]: Effect: ContextShift](config: Fs2RabbitConfig, blocker: Blocker): F[AckingProgram[F]] = Sync[F].delay {
WrapperAckingProgram(config, Consume.make(blocker))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package dev.profunktor.fs2rabbit.program

import cats.effect.{Effect, Sync}
import cats.effect.{Blocker, ContextShift, Effect, Sync}
import cats.implicits._
import dev.profunktor.fs2rabbit.algebra.ConsumingStream._
import dev.profunktor.fs2rabbit.algebra.{AMQPInternals, Consume, InternalQueue}
Expand All @@ -26,9 +26,10 @@ import dev.profunktor.fs2rabbit.model._
import fs2.Stream

object ConsumingProgram {
def make[F[_]: Effect](internalQueue: InternalQueue[F]): F[ConsumingProgram[F]] = Sync[F].delay {
WrapperConsumingProgram(internalQueue, Consume.make)
}
def make[F[_]: Effect: ContextShift](internalQueue: InternalQueue[F], blocker: Blocker): F[ConsumingProgram[F]] =
Sync[F].delay {
WrapperConsumingProgram(internalQueue, Consume.make(blocker))
}
}

trait ConsumingProgram[F[_]] extends ConsumingStream[F] with Consume[F]
Expand Down

0 comments on commit 6b30c3c

Please sign in to comment.