From 723f7c261087ea3d6594053dbd89d3fdc7d560e8 Mon Sep 17 00:00:00 2001 From: geirolz Date: Fri, 31 May 2024 17:47:50 +0200 Subject: [PATCH] Declare queue type --- .../fs2rabbit/algebra/Declaration.scala | 42 ++++++++++--------- .../fs2rabbit/config/declaration.scala | 40 ++++++++++++++++-- .../dev/profunktor/fs2rabbit/model.scala | 6 +-- 3 files changed, 62 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Declaration.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Declaration.scala index a7a7820d..2ee92673 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Declaration.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Declaration.scala @@ -17,7 +17,7 @@ package dev.profunktor.fs2rabbit.algebra import cats.effect.Sync -import cats.syntax.functor._ +import cats.syntax.all._ import dev.profunktor.fs2rabbit.arguments._ import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig} import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._ @@ -63,26 +63,30 @@ object Declaration { } override def declareQueue(channel: AMQPChannel, config: DeclarationQueueConfig): F[Unit] = - Sync[F].blocking { - channel.value.queueDeclare( - config.queueName.value, - config.durable.isTrue, - config.exclusive.isTrue, - config.autoDelete.isTrue, - config.arguments - ) - }.void + Sync[F].fromEither(config.validatedArguments).flatMap { args => + Sync[F].blocking { + channel.value.queueDeclare( + config.queueName.value, + config.durable.isTrue, + config.exclusive.isTrue, + config.autoDelete.isTrue, + args + ) + }.void + } override def declareQueueNoWait(channel: AMQPChannel, config: DeclarationQueueConfig): F[Unit] = - Sync[F].blocking { - channel.value.queueDeclareNoWait( - config.queueName.value, - config.durable.isTrue, - config.exclusive.isTrue, - config.autoDelete.isTrue, - config.arguments - ) - }.void + Sync[F].fromEither(config.validatedArguments).flatMap { args => + Sync[F].blocking { + channel.value.queueDeclareNoWait( + config.queueName.value, + config.durable.isTrue, + config.exclusive.isTrue, + config.autoDelete.isTrue, + args + ) + }.void + } override def declareQueuePassive(channel: AMQPChannel, queueName: QueueName): F[Unit] = Sync[F].blocking { diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/config/declaration.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/config/declaration.scala index 925d6a5e..09e6b7c5 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/config/declaration.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/config/declaration.scala @@ -17,7 +17,7 @@ package dev.profunktor.fs2rabbit.config import dev.profunktor.fs2rabbit.arguments.Arguments -import dev.profunktor.fs2rabbit.model.{ExchangeName, ExchangeType, QueueName} +import dev.profunktor.fs2rabbit.model.{ExchangeName, ExchangeType, QueueName, QueueType} object declaration { @@ -26,12 +26,44 @@ object declaration { durable: DurableCfg, exclusive: ExclusiveCfg, autoDelete: AutoDeleteCfg, - arguments: Arguments - ) + arguments: Arguments, + queueType: Option[QueueType] + ) { + + lazy val validatedArguments: Either[IllegalArgumentException, Arguments] = + queueType match { + case Some(_) if arguments.contains("x-queue-type") => + Left( + new IllegalArgumentException( + "Queue type defined twice. It is set in the arguments and in the DeclarationQueueConfig." + ) + ) + case Some(queueType) => + Right(arguments + ("x-queue-type" -> queueType.asString)) + case None => + Right(arguments) + } + } object DeclarationQueueConfig { def default(queueName: QueueName): DeclarationQueueConfig = - DeclarationQueueConfig(queueName, NonDurable, NonExclusive, NonAutoDelete, Map.empty) + DeclarationQueueConfig( + queueName = queueName, + durable = NonDurable, + exclusive = NonExclusive, + autoDelete = NonAutoDelete, + arguments = Map.empty, + queueType = None + ) + + def classic(queueName: QueueName): DeclarationQueueConfig = + default(queueName).copy(queueType = Some(QueueType.Classic)) + + def quorum(queueName: QueueName): DeclarationQueueConfig = + default(queueName).copy(queueType = Some(QueueType.Quorum)) + + def stream(queueName: QueueName): DeclarationQueueConfig = + default(queueName).copy(queueType = Some(QueueType.Stream)) } sealed trait DurableCfg extends Product with Serializable diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala index 9caae4b9..7d0578d4 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala @@ -96,12 +96,12 @@ object model { sealed trait QueueType extends Product with Serializable { def asString: String = this match { case QueueType.Classic => "classic" - case QueueType.Quorum => "quorum" - case QueueType.Stream => "stream" + case QueueType.Quorum => "quorum" + case QueueType.Stream => "stream" } } object QueueType { - case object Classic extends QueueType + case object Classic extends QueueType case object Quorum extends QueueType case object Stream extends QueueType }