Skip to content

Commit

Permalink
Merge pull request #417 from fd4s/ce-3
Browse files Browse the repository at this point in the history
Port to cats-effect 3.0
  • Loading branch information
bplommer authored Feb 19, 2021
2 parents cec73ce + 31a4ab6 commit d423708
Show file tree
Hide file tree
Showing 54 changed files with 385 additions and 506 deletions.
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
val catsEffectVersion = "2.3.1"
val catsEffectVersion = "3.0.0-RC2"

val catsVersion = "2.4.1"

val confluentVersion = "6.1.0"

val fs2Version = "2.5.0"
val fs2Version = "3.0.0-M8"

val kafkaVersion = "2.7.0"

Expand Down Expand Up @@ -36,6 +36,7 @@ lazy val core = project
name := moduleName.value,
dependencySettings ++ Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"co.fs2" %% "fs2-core" % fs2Version,
"org.apache.kafka" % "kafka-clients" % kafkaVersion
)
Expand Down Expand Up @@ -87,6 +88,7 @@ lazy val dependencySettings = Seq(
.withDottyCompat(scalaVersion.value),
"org.typelevel" %% "discipline-scalatest" % "2.1.1",
"org.typelevel" %% "cats-effect-laws" % catsEffectVersion,
"org.typelevel" %% "cats-effect-testkit" % catsEffectVersion,
"org.typelevel" %% "cats-testkit-scalatest" % "2.1.1",
"ch.qos.logback" % "logback-classic" % "1.2.3"
).map(_ % Test),
Expand Down Expand Up @@ -226,7 +228,7 @@ lazy val publishSettings =
)

lazy val mimaSettings = Seq(
// Restore this after releasing v2.0.0
// Restore this after releasing v3.0.0
// mimaPreviousArtifacts := {
// if (publishArtifact.value) {
// Set(organization.value %% moduleName.value % (previousStableVersion in ThisBuild).value.get)
Expand Down
18 changes: 7 additions & 11 deletions docs/src/main/mdoc/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ def adminClientSettings[F[_]: Sync](bootstrapServers: String): AdminClientSettin

There are several settings specific to the library.

- `withBlocker` sets the `Blocker` on which blocking Java Kafka `AdminClient` functions are executed. Unless specified, a default fixed single-thread pool is created as part of admin client initialization, with the thread name using the `fs2-kafka-admin-client` prefix.

- `withCloseTimeout` controls the timeout when waiting for admin client shutdown. Default is 20 seconds.

- `withCreateAdminClient` changes how the underlying Java Kafka admin client is created. The default creates a Java `AdminClient` instance using set properties, but this function allows overriding the behaviour for e.g. testing purposes.
Expand All @@ -39,7 +37,7 @@ There are several settings specific to the library.
Once settings are defined, we can use create an admin client in a `Stream`.

```scala mdoc:silent
def kafkaAdminClientStream[F[_]: Concurrent: ContextShift](
def kafkaAdminClientStream[F[_]: Async](
bootstrapServers: String
): Stream[F, KafkaAdminClient[F]] =
KafkaAdminClient.stream(adminClientSettings[F](bootstrapServers))
Expand All @@ -48,7 +46,7 @@ def kafkaAdminClientStream[F[_]: Concurrent: ContextShift](
Alternatively, we can create an admin client in a `Resource` context.

```scala mdoc:silent
def kafkaAdminClientResource[F[_]: Concurrent: ContextShift](
def kafkaAdminClientResource[F[_]: Async](
bootstrapServers: String
): Resource[F, KafkaAdminClient[F]] =
KafkaAdminClient.resource(adminClientSettings[F](bootstrapServers))
Expand All @@ -61,7 +59,7 @@ There are functions available for describing, creating, and deleting topics.
```scala mdoc:silent
import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}

def topicOperations[F[_]: Concurrent: ContextShift]: F[Unit] =
def topicOperations[F[_]: Async]: F[Unit] =
kafkaAdminClientResource[F]("localhost:9092").use { client =>
for {
topicNames <- client.listTopics.names
Expand All @@ -83,7 +81,7 @@ We can edit the configuration of different resources, like topics and nodes.
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}

def configOperations[F[_]: Concurrent: ContextShift]: F[Unit] =
def configOperations[F[_]: Async]: F[Unit] =
kafkaAdminClientResource[F]("localhost:9092").use { client =>
val topic = new ConfigResource(ConfigResource.Type.TOPIC, "topic")

Expand All @@ -108,7 +106,7 @@ It's possible to retrieve metadata about the cluster nodes.
```scala mdoc:silent
import org.apache.kafka.common.Node

def clusterNodes[F[_]: Concurrent: ContextShift]: F[Set[Node]] =
def clusterNodes[F[_]: Async]: F[Set[Node]] =
kafkaAdminClientResource[F]("localhost:9092").use(_.describeCluster.nodes)
```

Expand All @@ -117,9 +115,7 @@ def clusterNodes[F[_]: Concurrent: ContextShift]: F[Set[Node]] =
There are functions available for working with consumer groups.

```scala mdoc:silent
import cats.Parallel

def consumerGroupOperations[F[_]: Concurrent: ContextShift: Parallel]: F[Unit] =
def consumerGroupOperations[F[_]: Async: cats.Parallel]: F[Unit] =
kafkaAdminClientResource[F]("localhost:9092").use { client =>
for {
consumerGroupIds <- client.listConsumerGroups.groupIds
Expand All @@ -145,7 +141,7 @@ import org.apache.kafka.common.resource.{
ResourceType
}

def aclOperations[F[_]: Concurrent: ContextShift]: F[Unit] =
def aclOperations[F[_]: Async]: F[Unit] =
kafkaAdminClientResource[F]("localhost:9092").use { client =>
for {
describedAcls <- client.describeAcls(AclBindingFilter.ANY)
Expand Down
6 changes: 2 additions & 4 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ In addition, there are several settings specific to the library.

- `withCreateConsumer` changes how the underlying Java Kafka consumer is created. The default merely creates a Java `KafkaConsumer` instance using set properties, but this function allows overriding the behaviour for e.g. testing purposes.

- `withBlocker` sets the `Blocker` on which blocking Java Kafka consumer functions are executed. Unless specified, a default fixed single-thread pool is created as part of consumer initialization, with the thread name using the `fs2-kafka-consumer` prefix.

- `withMaxPrefetchBatches` adjusts the maximum number of record batches per topic-partition to prefetch before backpressure is applied. The default is 2, meaning there can be up to 2 record batches per topic-partition waiting to be processed.

- `withPollInterval` alters how often consumer `poll` should take place. Default is 50 milliseconds.
Expand Down Expand Up @@ -344,7 +342,7 @@ To achieve this behavior we could use a `stopConsuming` method on a` KafkaConsum
We could combine `stopConsuming` with the custom resource handling and implement a graceful shutdown. Let's try it:

```scala mdoc:silent
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.{Deferred, Ref}

object WithGracefulShutdownExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
Expand Down Expand Up @@ -372,7 +370,7 @@ object WithGracefulShutdownExample extends IOApp {
}.uncancelable // [7]
} { case ((consumer, closeConsumer), exitCase) => // [8]
(exitCase match {
case ExitCase.Error(e) => handleError(e) // [9]
case Outcome.Errored(e) => handleError(e) // [9]
case _ => for {
_ <- gracefulShutdownStartedRef.set(true) // [10]
_ <- consumer.stopConsuming // [11]
Expand Down
2 changes: 0 additions & 2 deletions docs/src/main/mdoc/producers.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ The following settings are specific to the library.

- `withCreateProducer` changes how the underlying Java Kafka producer is created. The default merely creates a Java `KafkaProducer` instance using set properties, but this function allows overriding the behaviour for e.g. testing purposes.

- `withBlocker` sets the `Blocker` on which blocking Java Kafka producer functions are executed. Unless specified, a default fixed single-thread pool is created as part of producer initialization, with the thread name using the `fs2-kafka-producer` prefix.

## Producer Creation

Once [`ProducerSettings`][producersettings] is defined, use `KafkaProducer.stream` to create a [`KafkaProducer`][kafkaproducer] instance.
Expand Down
21 changes: 2 additions & 19 deletions modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

package fs2.kafka

import cats.effect.{Blocker, Sync}
import cats.effect.Sync
import cats.Show
import fs2.kafka.internal.converters.collection._
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
Expand All @@ -27,19 +27,6 @@ import scala.concurrent.duration._
*/
sealed abstract class AdminClientSettings[F[_]] {

/**
* The `Blocker` to use for blocking Kafka operations. If not
* explicitly provided, a default `Blocker` will be created
* when creating a `KafkaAdminClient` instance.
*/
def blocker: Option[Blocker]

/**
* Returns a new [[AdminClientSettings]] instance with the
* specified [[blocker]] to use for blocking operations.
*/
def withBlocker(blocker: Blocker): AdminClientSettings[F]

/**
* Properties which can be provided when creating a Java `KafkaAdminClient`
* instance. Numerous functions in [[AdminClientSettings]] add properties
Expand Down Expand Up @@ -205,13 +192,10 @@ sealed abstract class AdminClientSettings[F[_]] {

object AdminClientSettings {
private[this] final case class AdminClientSettingsImpl[F[_]](
override val blocker: Option[Blocker],
override val properties: Map[String, String],
override val closeTimeout: FiniteDuration,
val createAdminClientWith: Map[String, String] => F[AdminClient]
) extends AdminClientSettings[F] {
override def withBlocker(blocker: Blocker): AdminClientSettings[F] =
copy(blocker = Some(blocker))

override def withBootstrapServers(bootstrapServers: String): AdminClientSettings[F] =
withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
Expand Down Expand Up @@ -279,11 +263,10 @@ object AdminClientSettings {

def apply[F[_]](implicit F: Sync[F]): AdminClientSettings[F] =
AdminClientSettingsImpl(
blocker = None,
properties = Map.empty,
closeTimeout = 20.seconds,
createAdminClientWith = properties =>
F.delay {
F.blocking {
AdminClient.create {
(properties: Map[String, AnyRef]).asJava
}
Expand Down
23 changes: 10 additions & 13 deletions modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

package fs2.kafka

import cats.effect.Timer
import cats.effect.Temporal
import cats.syntax.applicativeError._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.{Functor, MonadError}
import cats.Functor
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RetriableCommitFailedException}
import org.apache.kafka.common.TopicPartition

Expand Down Expand Up @@ -44,9 +44,8 @@ abstract class CommitRecovery {
offsets: Map[TopicPartition, OffsetAndMetadata],
commit: F[Unit]
)(
implicit F: MonadError[F, Throwable],
jitter: Jitter[F],
timer: Timer[F]
implicit F: Temporal[F],
jitter: Jitter[F]
): Throwable => F[Unit]
}

Expand Down Expand Up @@ -89,15 +88,14 @@ object CommitRecovery {
offsets: Map[TopicPartition, OffsetAndMetadata],
commit: F[Unit]
)(
implicit F: MonadError[F, Throwable],
jitter: Jitter[F],
timer: Timer[F]
implicit F: Temporal[F],
jitter: Jitter[F]
): Throwable => F[Unit] = {
def retry(attempt: Int): Throwable => F[Unit] = {
case retriable: RetriableCommitFailedException =>
val commitWithRecovery = commit.handleErrorWith(retry(attempt + 1))
if (attempt <= 10) backoff(attempt).flatMap(timer.sleep) >> commitWithRecovery
else if (attempt <= 15) timer.sleep(10.seconds) >> commitWithRecovery
if (attempt <= 10) backoff(attempt).flatMap(F.sleep) >> commitWithRecovery
else if (attempt <= 15) F.sleep(10.seconds) >> commitWithRecovery
else F.raiseError(CommitRecoveryException(attempt - 1, retriable, offsets))

case nonRetriable: Throwable =>
Expand All @@ -120,9 +118,8 @@ object CommitRecovery {
offsets: Map[TopicPartition, OffsetAndMetadata],
commit: F[Unit]
)(
implicit F: MonadError[F, Throwable],
jitter: Jitter[F],
timer: Timer[F]
implicit F: Temporal[F],
jitter: Jitter[F]
): Throwable => F[Unit] = F.raiseError

override def toString: String =
Expand Down
11 changes: 5 additions & 6 deletions modules/core/src/main/scala/fs2/kafka/ConsumerResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

package fs2.kafka

import cats.effect.{ConcurrentEffect, ContextShift, Resource, Timer}
import cats.effect.{Resource, Async}

/**
* [[ConsumerResource]] provides support for inferring the key and value
Expand All @@ -18,19 +18,18 @@ import cats.effect.{ConcurrentEffect, ContextShift, Resource, Timer}
* }}}
*/
final class ConsumerResource[F[_]] private[kafka] (
private val F: ConcurrentEffect[F]
private val F: Async[F]
) extends AnyVal {

/**
* Creates a new [[KafkaConsumer]] in the `Resource` context.
* This is equivalent to using `KafkaConsumer.resource` directly,
* except we're able to infer the key and value type.
*/
def using[K, V](settings: ConsumerSettings[F, K, V])(
implicit context: ContextShift[F],
timer: Timer[F]
def using[K, V](
settings: ConsumerSettings[F, K, V]
): Resource[F, KafkaConsumer[F, K, V]] =
KafkaConsumer.resource(settings)(F, context, timer)
KafkaConsumer.resource[F, K, V](settings)(F)

override def toString: String =
"ConsumerResource$" + System.identityHashCode(this)
Expand Down
19 changes: 1 addition & 18 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

package fs2.kafka

import cats.effect.{Blocker, Sync}
import cats.effect.{Sync}
import cats.Show
import fs2.kafka.internal.converters.collection._
import org.apache.kafka.clients.consumer.ConsumerConfig
Expand Down Expand Up @@ -47,19 +47,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def valueDeserializer: F[Deserializer[F, V]]

/**
* The `Blocker` to use for blocking Kafka operations. If not
* explicitly provided, a default `Blocker` will be created
* when creating a `KafkaConsumer` instance.
*/
def blocker: Option[Blocker]

/**
* Returns a new [[ConsumerSettings]] instance with the
* specified [[blocker]] to use for blocking operations.
*/
def withBlocker(blocker: Blocker): ConsumerSettings[F, K, V]

/**
* Properties which can be provided when creating a Java `KafkaConsumer`
* instance. Numerous functions in [[ConsumerSettings]] add properties
Expand Down Expand Up @@ -403,7 +390,6 @@ object ConsumerSettings {
private[this] final case class ConsumerSettingsImpl[F[_], K, V](
override val keyDeserializer: F[Deserializer[F, K]],
override val valueDeserializer: F[Deserializer[F, V]],
override val blocker: Option[Blocker],
override val properties: Map[String, String],
override val closeTimeout: FiniteDuration,
override val commitTimeout: FiniteDuration,
Expand All @@ -414,8 +400,6 @@ object ConsumerSettings {
override val maxPrefetchBatches: Int,
val createConsumerWith: Map[String, String] => F[KafkaByteConsumer]
) extends ConsumerSettings[F, K, V] {
override def withBlocker(blocker: Blocker): ConsumerSettings[F, K, V] =
copy(blocker = Some(blocker))

override def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
Expand Down Expand Up @@ -546,7 +530,6 @@ object ConsumerSettings {
ConsumerSettingsImpl(
keyDeserializer = keyDeserializer,
valueDeserializer = valueDeserializer,
blocker = None,
properties = Map(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "none",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
Expand Down
11 changes: 4 additions & 7 deletions modules/core/src/main/scala/fs2/kafka/ConsumerStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

package fs2.kafka

import cats.effect.{ConcurrentEffect, ContextShift, Timer}
import cats.effect.Async
import fs2.Stream

/**
Expand All @@ -19,19 +19,16 @@ import fs2.Stream
* }}}
*/
final class ConsumerStream[F[_]] private[kafka] (
private val F: ConcurrentEffect[F]
private val F: Async[F]
) extends AnyVal {

/**
* Creates a new [[KafkaConsumer]] in the `Stream` context.
* This is equivalent to using `KafkaConsumer.stream` directly,
* except we're able to infer the key and value type.
*/
def using[K, V](settings: ConsumerSettings[F, K, V])(
implicit context: ContextShift[F],
timer: Timer[F]
): Stream[F, KafkaConsumer[F, K, V]] =
KafkaConsumer.stream(settings)(F, context, timer)
def using[K, V](settings: ConsumerSettings[F, K, V]): Stream[F, KafkaConsumer[F, K, V]] =
KafkaConsumer.stream[F, K, V](settings)(F)

override def toString: String =
"ConsumerStream$" + System.identityHashCode(this)
Expand Down
2 changes: 1 addition & 1 deletion modules/core/src/main/scala/fs2/kafka/Deserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ object Deserializer {

override def suspend: Deserializer[F, A] =
Deserializer.instance { (topic, headers, bytes) =>
F.suspend(deserialize(topic, headers, bytes))
F.defer(deserialize(topic, headers, bytes))
}

override def toString: String =
Expand Down
Loading

0 comments on commit d423708

Please sign in to comment.