Skip to content

Commit

Permalink
add copy method to rabbitclient builder (#705)
Browse files Browse the repository at this point in the history
* add copy method to rabbitclient builder
  • Loading branch information
jbwheatley authored Mar 8, 2022
1 parent 2ae3814 commit d1b58cd
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 51 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ val commonSettings = List(
libraryDependencies ++= commonDependencies(scalaVersion.value),
resolvers += "Apache public" at "https://repository.apache.org/content/groups/public/",
scalafmtOnCompile := true,
mimaPreviousArtifacts := Set(organization.value %% moduleName.value % "4.1.0")
mimaPreviousArtifacts := Set(organization.value %% moduleName.value % "4.1.1")
)

def CoreDependencies(scalaVersionStr: String): List[ModuleID] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,14 @@ object RabbitClient {
threadFactory: Option[F[ThreadFactory]],
executionContext: Option[F[ExecutionContext]]
) {
def withSslContext(sslContext: SSLContext): Builder[F] = new Builder[F](
config = config,
sslContext = Some(sslContext),
saslConfig = saslConfig,
metricsCollector = metricsCollector,
threadFactory = threadFactory,
executionContext = executionContext
) {}

def withSaslConfig(saslConfig: SaslConfig): Builder[F] = new Builder[F](
private def copy(
config: Fs2RabbitConfig = config,
sslContext: Option[SSLContext] = sslContext,
saslConfig: SaslConfig = saslConfig,
metricsCollector: Option[MetricsCollector] = metricsCollector,
threadFactory: Option[F[ThreadFactory]] = threadFactory,
executionContext: Option[F[ExecutionContext]] = executionContext
): Builder[F] = new Builder[F](
config = config,
sslContext = sslContext,
saslConfig = saslConfig,
Expand All @@ -112,38 +110,22 @@ object RabbitClient {
executionContext = executionContext
) {}

def withMetricsCollector(metricsCollector: MetricsCollector): Builder[F] = new Builder[F](
config = config,
sslContext = sslContext,
saslConfig = saslConfig,
metricsCollector = Some(metricsCollector),
threadFactory = threadFactory,
executionContext = executionContext
) {}
def withSslContext(sslContext: SSLContext): Builder[F] = copy(sslContext = Some(sslContext))

def withThreadFactory(threadFactory: F[ThreadFactory]): Builder[F] = new Builder[F](
config = config,
sslContext = sslContext,
saslConfig = saslConfig,
metricsCollector = metricsCollector,
threadFactory = Some(threadFactory),
executionContext = executionContext
) {}
def withSaslConfig(saslConfig: SaslConfig): Builder[F] = copy(saslConfig = saslConfig)

def withMetricsCollector(metricsCollector: MetricsCollector): Builder[F] =
copy(metricsCollector = Some(metricsCollector))

def withThreadFactory(threadFactory: F[ThreadFactory]): Builder[F] = copy(threadFactory = Some(threadFactory))

def withExecutionContext(executionContext: F[ExecutionContext]): Builder[F] =
new Builder[F](
config = config,
sslContext = sslContext,
saslConfig = saslConfig,
metricsCollector = metricsCollector,
threadFactory = threadFactory,
executionContext = Some(executionContext)
) {}
copy(executionContext = Some(executionContext))

def build(dispatcher: Dispatcher[F]): F[RabbitClient[F]] =
create[F](config, dispatcher, sslContext, saslConfig, metricsCollector, threadFactory, executionContext)

def resource(): Resource[F, RabbitClient[F]] =
def resource: Resource[F, RabbitClient[F]] =
Dispatcher[F].evalMap(build)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object DropwizardMetricsDemo extends IOApp.Simple {

val resources = for {
_ <- JmxReporterResource.make[IO](registry)
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource()
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
channel <- client.createConnection.flatMap(client.createChannel)
} yield (channel, client)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object IOAckerConsumer extends IOApp.Simple {
)

override def run: IO[Unit] =
RabbitClient.default[IO](config).resource().use { client =>
RabbitClient.default[IO](config).resource.use { client =>
ResilientStream
.runF(new AckerConsumerDemo[IO](client).program)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object RPCDemo extends IOApp.Simple {
)

def run: IO[Unit] =
RabbitClient.default[IO](config).resource().use { implicit client =>
RabbitClient.default[IO](config).resource.use { implicit client =>
val queue = QueueName("rpc_queue")
runServer[IO](queue).concurrently(runClient[IO](queue)).compile.drain
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object ZIOAutoAckConsumer extends CatsApp {
override def run(args: List[String]): URIO[ZEnv, ExitCode] =
RabbitClient
.default[Task](config)
.resource()
.resource
.use { client =>
ResilientStream
.runF(new AutoAckConsumerDemo[Task](client).program)
Expand Down
3 changes: 2 additions & 1 deletion site/docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ val config = Fs2RabbitConfig(
requeueOnReject = false,
internalQueueSize = Some(500),
requestedHeartbeat = 30.seconds,
automaticRecovery = true
automaticRecovery = true,
clientProvidedConnectionName = Some("app:rabbit")
)
```

Expand Down
9 changes: 5 additions & 4 deletions site/docs/examples/client-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ val dropwizardCollector = new StandardMetricsCollector(registry)
Now it is ready to use.

```scala
RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource()
RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
```

## Expose via JMX
Expand Down Expand Up @@ -54,7 +54,7 @@ Let's initialise the FS2 RabbitMQ client and AMQP channel with metrics.
```scala
val resources = for {
_ <- JmxReporterResource.make[IO](registry)
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource()
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
channel <- client.createConnection.flatMap(client.createChannel)
} yield (channel, client)

Expand Down Expand Up @@ -109,7 +109,8 @@ object DropwizardMetricsDemo extends IOApp {
requeueOnReject = false,
internalQueueSize = Some(500),
requestedHeartbeat = 60.seconds,
automaticRecovery = true
automaticRecovery = true,
clientProvidedConnectionName = Some("app:rabbit")
)

private val queueName = QueueName("testQ")
Expand All @@ -129,7 +130,7 @@ object DropwizardMetricsDemo extends IOApp {

val resources = for {
_ <- JmxReporterResource.make[IO](registry)
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource()
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
channel <- client.createConnection.flatMap(client.createChannel)
} yield (channel, client)

Expand Down
5 changes: 3 additions & 2 deletions site/docs/examples/sample-acker.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,12 @@ object IOAckerConsumer extends IOApp {
requeueOnReject = false,
internalQueueSize = Some(500),
requestedHeartbeat = 60.seconds,
automaticRecovery = true
automaticRecovery = true,
clientProvidedConnectionName = Some("app:rabbit")
)

override def run(args: List[String]): IO[ExitCode] =
RabbitClient.default[IO](config).resource().use { client =>
RabbitClient.default[IO](config).resource.use { client =>
ResilientStream
.runF(new AckerConsumerDemo[IO](client).program)
.as(ExitCode.Success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,31 +757,31 @@ trait Fs2RabbitSpec { self: BaseSpec =>
private def withStreamRabbit[A](fa: RabbitClient[IO] => Stream[IO, A]): Future[Assertion] =
RabbitClient
.default[IO](config)
.resource()
.resource
.use(r => fa(r).compile.drain)
.as(emptyAssertion)
.unsafeToFuture()

private def withStreamNackRabbit[A](fa: RabbitClient[IO] => Stream[IO, A]): Future[Assertion] =
RabbitClient
.default[IO](config.copy(requeueOnNack = true))
.resource()
.resource
.use(r => fa(r).compile.drain)
.as(emptyAssertion)
.unsafeToFuture()

private def withStreamRejectRabbit[A](fa: RabbitClient[IO] => Stream[IO, A]): Future[Assertion] =
RabbitClient
.default[IO](config.copy(requeueOnReject = true))
.resource()
.resource
.use(r => fa(r).compile.drain)
.as(emptyAssertion)
.unsafeToFuture()

private def withRabbit[A](fa: RabbitClient[IO] => IO[A]): Future[A] =
RabbitClient
.default[IO](config)
.resource()
.resource
.use(r => fa(r))
.unsafeToFuture()

Expand Down

0 comments on commit d1b58cd

Please sign in to comment.