From b71854f8fcb774aa27ccccb60a17da35986b2415 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Mon, 27 Mar 2023 16:28:41 +0100 Subject: [PATCH 01/19] Revert "Change to publish on series branches" This reverts commit 78fc4258c8df82919d8ae38f6221dc4b6318915c. --- .github/workflows/ci.yml | 2 +- build.sbt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9636195d..ef098e193 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,7 +97,7 @@ jobs: publish: name: Publish Artifacts needs: [build] - if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/heads/series/')) + if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v')) strategy: matrix: os: [ubuntu-latest] diff --git a/build.sbt b/build.sbt index 3c2e82ea9..acb521413 100644 --- a/build.sbt +++ b/build.sbt @@ -211,7 +211,7 @@ ThisBuild / githubWorkflowArtifactUpload := false ThisBuild / githubWorkflowJavaVersions := Seq(JavaSpec.temurin("8"), JavaSpec.temurin("17")) ThisBuild / githubWorkflowPublishTargetBranches := - Seq(RefPredicate.StartsWith(Ref.Branch("series/"))) + Seq(RefPredicate.StartsWith(Ref.Tag("v"))) ThisBuild / githubWorkflowPublish := Seq( WorkflowStep.Sbt( From 6e496a789e56bb74a9d37e197ea80336823b0523 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Tue, 11 Apr 2023 12:58:16 +0100 Subject: [PATCH 02/19] Preparatory changes for new CE cancelation semantics --- .../src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala | 3 ++- .../src/main/scala/fs2/kafka/internal/WithAdminClient.scala | 2 +- modules/core/src/main/scala/fs2/kafka/internal/syntax.scala | 2 +- .../core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 166a062d9..454461d31 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -102,7 +102,8 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( } private[this] def manualCommitSync(request: Request.ManualCommitSync[F]): F[Unit] = { - val commit = withConsumer.blocking(_.commitSync(request.offsets.asJava)) + val commit = + withConsumer.blocking(_.commitSync(request.offsets.asJava, settings.commitTimeout.asJava)) commit.attempt >>= request.callback } diff --git a/modules/core/src/main/scala/fs2/kafka/internal/WithAdminClient.scala b/modules/core/src/main/scala/fs2/kafka/internal/WithAdminClient.scala index b05d1e3fe..60b4606b3 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/WithAdminClient.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/WithAdminClient.scala @@ -28,7 +28,7 @@ private[kafka] object WithAdminClient { val withAdminClient = new WithAdminClient[G] { override def apply[A](f: AdminClient => KafkaFuture[A]): G[A] = - G.delay(f(adminClient)).cancelable + G.delay(f(adminClient)).cancelable_ } val close = diff --git a/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala b/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala index e63a96011..b37953e74 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala @@ -189,7 +189,7 @@ private[kafka] object syntax { ) extends AnyVal { // Inspired by Monix's `CancelableFuture#fromJavaCompletable`. - def cancelable(implicit F: Async[F]): F[A] = + def cancelable_(implicit F: Async[F]): F[A] = F.async { (cb: (Either[Throwable, A] => Unit)) => futureF.flatMap { future => F.blocking { diff --git a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala index 474b8e4b2..90ea67538 100644 --- a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala @@ -83,7 +83,7 @@ final class SyntaxSpec extends BaseSpec { } future } - fiber <- futureIO.cancelable.start + fiber <- futureIO.cancelable_.start _ <- gate.get // wait for future to be created before canceling it _ <- IO(assert(!isFutureCancelled)) _ <- fiber.cancel From 792455ce0235d95f7672b68377e28e095091ef35 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Tue, 11 Apr 2023 13:47:35 +0100 Subject: [PATCH 03/19] Update tlBaseVersion --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index acb521413..3f157d95e 100644 --- a/build.sbt +++ b/build.sbt @@ -18,7 +18,7 @@ val scala213 = "2.13.10" val scala3 = "3.2.2" -ThisBuild / tlBaseVersion := "2.5" +ThisBuild / tlBaseVersion := "2.6" ThisBuild / tlVersionIntroduced := Map("3" -> "2.1.0") From 264879965a0d6668807496cf38d610bd839e7b85 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Tue, 11 Apr 2023 14:08:40 +0100 Subject: [PATCH 04/19] Keep cancelability of runCommitAsync --- .../src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 454461d31..3a7caec1a 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -113,7 +113,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( k: (Either[Throwable, Unit] => Unit) => F[Unit] ): F[Unit] = F.async[Unit] { (cb: Either[Throwable, Unit] => Unit) => - k(cb).as(None) + k(cb).as(Some(F.unit)) } .timeoutTo(settings.commitTimeout, F.raiseError[Unit] { CommitTimeoutException( From 8c289d01380565c037aa5172e74628a49aab34a7 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 13 Apr 2023 14:42:38 +0000 Subject: [PATCH 05/19] Update sbt-mima-plugin to 1.1.2 in series/2.x --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 345db90be..e17f0e671 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0") -addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.1") +addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.2") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.9.0") addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.3.7") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0") From caec3d3f88659076ed299bb7850e1970d2434644 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 13 Apr 2023 14:42:58 +0000 Subject: [PATCH 06/19] Update kafka-avro-serializer to 6.2.10 in series/2.x --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index acb521413..2009f1c27 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ val catsEffectVersion = "3.4.8" -val confluentVersion = "6.2.9" +val confluentVersion = "6.2.10" val fs2Version = "3.6.1" From 313ceb59abc5bfc183c621b14c6b82c25453e242 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 13 Apr 2023 14:43:21 +0000 Subject: [PATCH 07/19] Update scalafmt-core to 2.2.2 in series/2.x --- .scalafmt.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.scalafmt.conf b/.scalafmt.conf index e9db1b22e..8c24adcda 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = "2.2.1" +version = "2.2.2" style = default maxColumn = 100 project.git = true From db0ea0ffd3d0956211a680941426f73ba7ec581b Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 13 Apr 2023 14:43:47 +0000 Subject: [PATCH 08/19] Reformat with scalafmt 2.2.2 Executed command: scalafmt --non-interactive --- .../src/main/scala/fs2/kafka/AdminClientSettings.scala | 2 -- .../core/src/main/scala/fs2/kafka/CommitRecovery.scala | 2 -- .../main/scala/fs2/kafka/CommitRecoveryException.scala | 1 - .../scala/fs2/kafka/CommittableConsumerRecord.scala | 1 - .../src/main/scala/fs2/kafka/CommittableOffset.scala | 2 -- .../main/scala/fs2/kafka/CommittableOffsetBatch.scala | 1 - .../scala/fs2/kafka/CommittableProducerRecords.scala | 1 - .../core/src/main/scala/fs2/kafka/ConsumerRecord.scala | 1 - .../src/main/scala/fs2/kafka/ConsumerSettings.scala | 1 - .../core/src/main/scala/fs2/kafka/Deserializer.scala | 1 - modules/core/src/main/scala/fs2/kafka/Header.scala | 1 - .../src/main/scala/fs2/kafka/HeaderDeserializer.scala | 2 -- .../src/main/scala/fs2/kafka/HeaderSerializer.scala | 1 - modules/core/src/main/scala/fs2/kafka/Headers.scala | 1 - .../src/main/scala/fs2/kafka/KafkaAdminClient.scala | 10 ---------- .../core/src/main/scala/fs2/kafka/KafkaConsumer.scala | 5 ----- .../core/src/main/scala/fs2/kafka/KafkaProducer.scala | 7 ------- .../main/scala/fs2/kafka/KafkaProducerConnection.scala | 2 -- .../core/src/main/scala/fs2/kafka/ProducerRecord.scala | 1 - .../src/main/scala/fs2/kafka/ProducerRecords.scala | 1 - .../core/src/main/scala/fs2/kafka/ProducerResult.scala | 2 -- .../src/main/scala/fs2/kafka/ProducerSettings.scala | 1 - .../src/main/scala/fs2/kafka/RecordDeserializer.scala | 1 - .../src/main/scala/fs2/kafka/RecordSerializer.scala | 1 - modules/core/src/main/scala/fs2/kafka/Serializer.scala | 1 - modules/core/src/main/scala/fs2/kafka/Timestamp.scala | 2 -- .../scala/fs2/kafka/TransactionalKafkaProducer.scala | 5 ----- .../scala/fs2/kafka/TransactionalProducerRecords.scala | 1 - .../fs2/kafka/TransactionalProducerSettings.scala | 1 - .../scala/fs2/kafka/consumer/KafkaAssignment.scala | 1 - .../main/scala/fs2/kafka/consumer/KafkaCommit.scala | 1 - .../main/scala/fs2/kafka/consumer/KafkaConsume.scala | 1 - .../fs2/kafka/consumer/KafkaConsumerLifecycle.scala | 1 - .../main/scala/fs2/kafka/consumer/KafkaMetrics.scala | 1 - .../main/scala/fs2/kafka/consumer/KafkaOffsets.scala | 2 -- .../main/scala/fs2/kafka/consumer/KafkaOffsetsV2.scala | 1 - .../scala/fs2/kafka/consumer/KafkaSubscription.scala | 2 -- .../main/scala/fs2/kafka/consumer/KafkaTopics.scala | 1 - .../src/main/scala/fs2/kafka/internal/FakeFiber.scala | 1 - .../scala/fs2/kafka/internal/KafkaConsumerActor.scala | 1 - .../src/main/scala/fs2/kafka/internal/package.scala | 1 - .../src/main/scala/fs2/kafka/internal/syntax.scala | 2 -- modules/core/src/test/scala/cats/tests/CatsSuite.scala | 2 -- .../core/src/test/scala/fs2/kafka/BaseGenerators.scala | 1 - .../core/src/test/scala/fs2/kafka/BaseKafkaSpec.scala | 1 - .../fs2/kafka/CommittableConsumerRecordLawsSpec.scala | 1 - .../scala/fs2/kafka/CommittableOffsetLawsSpec.scala | 2 -- .../test/scala/fs2/kafka/CommittableOffsetSpec.scala | 1 - .../fs2/kafka/CommittableProducerRecordsLawsSpec.scala | 1 - .../test/scala/fs2/kafka/ConsumerRecordLawsSpec.scala | 1 - .../core/src/test/scala/fs2/kafka/HeaderLawsSpec.scala | 2 -- .../src/test/scala/fs2/kafka/HeadersLawsSpec.scala | 2 -- .../test/scala/fs2/kafka/KafkaAdminClientSpec.scala | 2 -- .../src/test/scala/fs2/kafka/KafkaConsumerSpec.scala | 3 --- .../src/test/scala/fs2/kafka/KafkaProducerSpec.scala | 1 - .../test/scala/fs2/kafka/ProducerRecordLawsSpec.scala | 1 - .../test/scala/fs2/kafka/ProducerSettingsSpec.scala | 1 - .../test/scala/fs2/kafka/RecordDeserializerSpec.scala | 4 ---- .../test/scala/fs2/kafka/RecordSerializerSpec.scala | 3 --- .../src/test/scala/fs2/kafka/TimestampLawsSpec.scala | 2 -- .../fs2/kafka/TransactionalKafkaProducerSpec.scala | 3 --- .../src/test/scala/fs2/kafka/internal/SyntaxSpec.scala | 3 --- .../scala/fs2/kafka/vulcan/testkit/SchemaSuite.scala | 3 --- .../main/scala/fs2/kafka/vulcan/AvroDeserializer.scala | 1 - .../src/main/scala/fs2/kafka/vulcan/AvroSettings.scala | 1 - .../kafka/vulcan/SchemaRegistryClientSettings.scala | 1 - .../src/main/scala/fs2/kafka/vulcan/package.scala | 1 - scalafix/rules/src/main/scala/fix/Fs2Kafka.scala | 3 --- 68 files changed, 122 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala b/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala index 49f336d63..a6e67e3e2 100644 --- a/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala @@ -25,7 +25,6 @@ import scala.concurrent.duration._ * then apply any desired modifications on top of that instance. */ sealed abstract class AdminClientSettings { - /** * Properties which can be provided when creating a Java `KafkaAdminClient` * instance. Numerous functions in [[AdminClientSettings]] add properties @@ -183,7 +182,6 @@ object AdminClientSettings { override val properties: Map[String, String], override val closeTimeout: FiniteDuration ) extends AdminClientSettings { - override def withBootstrapServers(bootstrapServers: String): AdminClientSettings = withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) diff --git a/modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala b/modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala index 6d5a21138..c373b57a8 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala @@ -28,7 +28,6 @@ import scala.concurrent.duration._ * set it with [[ConsumerSettings#withCommitRecovery]]. */ abstract class CommitRecovery { - /** * Describes recovery from offset commit exceptions. The `commit` * parameter can be used to retry the commit. Note that if more @@ -50,7 +49,6 @@ abstract class CommitRecovery { } object CommitRecovery { - /** * The default [[CommitRecovery]] used in [[ConsumerSettings]] unless * a different one has been specified. The default recovery strategy diff --git a/modules/core/src/main/scala/fs2/kafka/CommitRecoveryException.scala b/modules/core/src/main/scala/fs2/kafka/CommitRecoveryException.scala index 85d2d84de..bb4cc05d9 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommitRecoveryException.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommitRecoveryException.scala @@ -41,7 +41,6 @@ sealed abstract class CommitRecoveryException( }) object CommitRecoveryException { - /** * Creates a new [[CommitRecoveryException]] indicating that offset * commit recovery was attempted `attempts` times for `offsets` but diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableConsumerRecord.scala b/modules/core/src/main/scala/fs2/kafka/CommittableConsumerRecord.scala index 4b7f69d71..3f3c9e481 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommittableConsumerRecord.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommittableConsumerRecord.scala @@ -28,7 +28,6 @@ import cats.{Applicative, Bitraverse, Eq, Eval, Show, Traverse} * can be used to create a new instance. */ sealed abstract class CommittableConsumerRecord[F[_], +K, +V] { - /** * The Kafka record for the [[CommittableConsumerRecord]]. If you * are not committing offsets to Kafka, simply use this to get the diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala b/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala index 9412aad76..9fde53d68 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala @@ -24,7 +24,6 @@ import org.apache.kafka.common.TopicPartition * used to create a new instance. */ sealed abstract class CommittableOffset[F[_]] { - /** * The topic and partition for which [[offsetAndMetadata]] * can be committed using [[commit]]. @@ -79,7 +78,6 @@ sealed abstract class CommittableOffset[F[_]] { } object CommittableOffset { - /** * Creates a new [[CommittableOffset]] with the specified `topicPartition` * and `offsetAndMetadata`, along with `commit`, describing how to commit diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala b/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala index e0e3600c3..7e3a55cff 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala @@ -35,7 +35,6 @@ import org.apache.kafka.common.TopicPartition * also achieve better performance. */ sealed abstract class CommittableOffsetBatch[F[_]] { - /** * Creates a new [[CommittableOffsetBatch]] with the specified offset * included. Note that this function requires offsets to be in-order diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala b/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala index b31a3291a..652aefc18 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala @@ -30,7 +30,6 @@ import fs2.kafka.internal.syntax._ * the same transaction as the offset is committed. */ sealed abstract class CommittableProducerRecords[F[_], +K, +V] { - /** The records to produce. Can be empty to simply commit the offset. */ def records: Chunk[ProducerRecord[K, V]] diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala index c6a844633..651dbe149 100644 --- a/modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala +++ b/modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala @@ -21,7 +21,6 @@ import org.apache.kafka.common.record.TimestampType.{CREATE_TIME, LOG_APPEND_TIM * To create a new instance, use [[ConsumerRecord#apply]] */ sealed abstract class ConsumerRecord[+K, +V] { - /** The topic from which the record has been consumed. */ def topic: String diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala index 502ff6ad5..7285c07b7 100644 --- a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala @@ -36,7 +36,6 @@ import scala.concurrent.duration._ * Use `ConsumerSettings#apply` to create a new instance. */ sealed abstract class ConsumerSettings[F[_], K, V] { - /** * The `Deserializer` to use for deserializing record keys. */ diff --git a/modules/core/src/main/scala/fs2/kafka/Deserializer.scala b/modules/core/src/main/scala/fs2/kafka/Deserializer.scala index 9bfd95c6d..9ddedd4cd 100644 --- a/modules/core/src/main/scala/fs2/kafka/Deserializer.scala +++ b/modules/core/src/main/scala/fs2/kafka/Deserializer.scala @@ -17,7 +17,6 @@ import java.util.UUID * support for effect types. */ sealed abstract class Deserializer[F[_], A] { - /** * Attempts to deserialize the specified bytes into a value of * type `A`. The Kafka topic name, from which the serialized diff --git a/modules/core/src/main/scala/fs2/kafka/Header.scala b/modules/core/src/main/scala/fs2/kafka/Header.scala index 35e5ba241..b57dc9e5f 100644 --- a/modules/core/src/main/scala/fs2/kafka/Header.scala +++ b/modules/core/src/main/scala/fs2/kafka/Header.scala @@ -19,7 +19,6 @@ import cats.instances.string._ * To create a new [[Header]], use [[Header#apply]]. */ sealed abstract class Header extends org.apache.kafka.common.header.Header { - /** The header key. */ override def key: String diff --git a/modules/core/src/main/scala/fs2/kafka/HeaderDeserializer.scala b/modules/core/src/main/scala/fs2/kafka/HeaderDeserializer.scala index 50592b4d3..412edcaa2 100644 --- a/modules/core/src/main/scala/fs2/kafka/HeaderDeserializer.scala +++ b/modules/core/src/main/scala/fs2/kafka/HeaderDeserializer.scala @@ -19,7 +19,6 @@ import scala.annotation.tailrec * Kafka `Deserializer` interface. */ sealed abstract class HeaderDeserializer[A] { - /** * Deserializes the header value bytes into a value of type `A`. */ @@ -88,7 +87,6 @@ sealed abstract class HeaderDeserializer[A] { } object HeaderDeserializer { - /** Alias for `HeaderDeserializer[Either[Throwable, A]]`. */ type Attempt[A] = HeaderDeserializer[Either[Throwable, A]] diff --git a/modules/core/src/main/scala/fs2/kafka/HeaderSerializer.scala b/modules/core/src/main/scala/fs2/kafka/HeaderSerializer.scala index ab4d8152f..c785d1875 100644 --- a/modules/core/src/main/scala/fs2/kafka/HeaderSerializer.scala +++ b/modules/core/src/main/scala/fs2/kafka/HeaderSerializer.scala @@ -17,7 +17,6 @@ import java.util.UUID * the Kafka `Serializer` interface. */ sealed abstract class HeaderSerializer[A] { - /** * Serializes the specified value of type `A` into bytes. */ diff --git a/modules/core/src/main/scala/fs2/kafka/Headers.scala b/modules/core/src/main/scala/fs2/kafka/Headers.scala index c0a468308..c5d641dc7 100644 --- a/modules/core/src/main/scala/fs2/kafka/Headers.scala +++ b/modules/core/src/main/scala/fs2/kafka/Headers.scala @@ -18,7 +18,6 @@ import fs2.kafka.internal.syntax._ * instance of [[Header]] using `append`. */ sealed abstract class Headers { - /** * Returns the first header with the specified key, * wrapped in `Some`, or `None` if no such header diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala b/modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala index ee66f7372..f3afe8f65 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala @@ -31,7 +31,6 @@ import scala.annotation.nowarn * Use [[KafkaAdminClient.resource]] or [[KafkaAdminClient.stream]] to create an instance. */ sealed abstract class KafkaAdminClient[F[_]] { - /** * Updates the configuration for the specified resources. */ @@ -216,7 +215,6 @@ sealed abstract class KafkaAdminClient[F[_]] { } object KafkaAdminClient { - private[this] def alterConfigsWith[F[_]: Functor, G[_]: Foldable]( withAdminClient: WithAdminClient[F], configs: Map[ConfigResource, G[AlterConfigOp]] @@ -266,7 +264,6 @@ object KafkaAdminClient { withAdminClient(_.deleteAcls(filters.asJava).all).void sealed abstract class DescribeCluster[F[_]] { - /** Lists available nodes in the cluster. */ def nodes: F[Set[Node]] @@ -323,7 +320,6 @@ object KafkaAdminClient { withAdminClient(_.describeAcls(filter).values()).map(_.toList) sealed abstract class ListConsumerGroupOffsetsForPartitions[F[_]] { - /** Lists consumer group offsets on specified partitions for the consumer group. */ def partitionsToOffsetAndMetadata: F[Map[TopicPartition, OffsetAndMetadata]] } @@ -342,7 +338,6 @@ object KafkaAdminClient { adminClient .listConsumerGroupOffsets(groupId, options) .partitionsToOffsetAndMetadata - }.map(_.toMap) override def toString: String = @@ -350,7 +345,6 @@ object KafkaAdminClient { } sealed abstract class ListConsumerGroupOffsets[F[_]] { - /** Lists consumer group offsets for the consumer group. */ def partitionsToOffsetAndMetadata: F[Map[TopicPartition, OffsetAndMetadata]] @@ -382,7 +376,6 @@ object KafkaAdminClient { } sealed abstract class ListConsumerGroups[F[_]] { - /** Lists the available consumer group ids. */ def groupIds: F[List[String]] @@ -405,7 +398,6 @@ object KafkaAdminClient { } sealed abstract class ListTopicsIncludeInternal[F[_]] { - /** Lists topic names. Includes internal topics. */ def names: F[Set[String]] @@ -437,7 +429,6 @@ object KafkaAdminClient { } sealed abstract class ListTopics[F[_]] { - /** Lists topic names. */ def names: F[Set[String]] @@ -518,7 +509,6 @@ object KafkaAdminClient { private def create[F[_]: Functor](client: WithAdminClient[F]) = new KafkaAdminClient[F] { - override def alterConfigs[G[_]](configs: Map[ConfigResource, G[AlterConfigOp]])( implicit G: Foldable[G] ): F[Unit] = diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 9984547f1..d4d80170c 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -126,7 +126,6 @@ object KafkaConsumer { stopConsumingDeferred: Deferred[F, Unit] )(implicit F: Async[F], logging: Logging[F]): KafkaConsumer[F, K, V] = new KafkaConsumer[F, K, V] { - override def partitionsMapStream : Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]] = { val chunkQueue: F[Queue[F, Option[Chunk[CommittableConsumerRecord[F, K, V]]]]] = @@ -394,7 +393,6 @@ object KafkaConsumer { actor.ref.updateAndGet(_.withOnRebalance(on).asStreaming).flatTap { newState => logging.log(LogEntry.StoredOnRebalance(on, newState)) } - } .ensure(NotSubscribedException())(_.subscribed) >> withConsumer.blocking(_.assignment.toSortedSet) @@ -547,7 +545,6 @@ object KafkaConsumer { } >> actor.ref .updateAndGet(_.asSubscribed) .log(LogEntry.ManuallyAssignedPartitions(partitions, _)) - } override def assign(topic: String): F[Unit] = @@ -688,7 +685,6 @@ object KafkaConsumer { private[kafka] final class ConsumerPartiallyApplied[F[_]](val dummy: Boolean = true) extends AnyVal { - /** * Alternative version of `resource` where the `F[_]` is * specified explicitly, and where the key and value type can @@ -730,7 +726,6 @@ object KafkaConsumer { * to explicitly use operations such as `flatMap` and `evalTap` */ implicit final class StreamOps[F[_]: Functor, K, V](self: Stream[F, KafkaConsumer[F, K, V]]) { - /** * Subscribes a consumer to the specified topics within the [[Stream]] context. * See [[KafkaSubscription#subscribe]]. diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala index 07ee6d0f8..9a447f224 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala @@ -27,7 +27,6 @@ import scala.concurrent.Promise * offsets, but any value can be used as passthrough value. */ abstract class KafkaProducer[F[_], K, V] { - /** * Produces the specified [[ProducerRecords]] in two steps: the * first effect puts the records in the buffer of the producer, @@ -57,10 +56,8 @@ abstract class KafkaProducer[F[_], K, V] { } object KafkaProducer { - implicit class ProducerOps[F[_], K, V](private val producer: KafkaProducer[F, K, V]) extends AnyVal { - /** * Produce a single [[ProducerRecord]] without a passthrough value, * see [[KafkaProducer.produce]] for general semantics. @@ -103,7 +100,6 @@ object KafkaProducer { */ def produceOne[P](record: ProducerRecord[K, V], passthrough: P): F[F[ProducerResult[P, K, V]]] = producer.produce(ProducerRecords.one(record, passthrough)) - } /** @@ -111,7 +107,6 @@ object KafkaProducer { * access to the underlying producer metrics. */ abstract class Metrics[F[_], K, V] extends KafkaProducer[F, K, V] { - /** * Returns producer metrics. * @@ -125,7 +120,6 @@ object KafkaProducer { * access to the underlying producer partitions. */ abstract class PartitionsFor[F[_], K, V] extends KafkaProducer.Metrics[F, K, V] { - /** * Returns partition metadata for the given topic. * @@ -282,7 +276,6 @@ object KafkaProducer { private[kafka] final class ProducerPartiallyApplied[F[_]](val dummy: Boolean = true) extends AnyVal { - /** * Alternative version of `resource` where the `F[_]` is * specified explicitly, and where the key and value type can diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala index b8ca433a8..fdfd81fcd 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala @@ -23,7 +23,6 @@ import scala.annotation.nowarn * underlying connection. */ sealed abstract class KafkaProducerConnection[F[_]] { - def produce[P, K: Serializer[F, *], V: Serializer[F, *]]( records: ProducerRecords[P, K, V] ): F[F[ProducerResult[P, K, V]]] @@ -60,7 +59,6 @@ sealed abstract class KafkaProducerConnection[F[_]] { } object KafkaProducerConnection { - /** * Creates a new [[KafkaProducerConnection]] in the `Stream` context, * using the specified [[ProducerSettings]]. diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerRecord.scala b/modules/core/src/main/scala/fs2/kafka/ProducerRecord.scala index db9e27a19..a8834bb88 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerRecord.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerRecord.scala @@ -26,7 +26,6 @@ import cats.instances.option._ * To create a new instance, use [[ProducerRecord#apply]]. */ sealed abstract class ProducerRecord[+K, +V] { - /** The topic to which the record should be produced. */ def topic: String diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala b/modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala index c36e94ab0..6f8098915 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala @@ -28,7 +28,6 @@ import fs2.kafka.internal.syntax._ * existing [[ProducerRecords]] instance.
*/ sealed abstract class ProducerRecords[+P, +K, +V] { - /** The records to produce. Can be empty for passthrough-only. */ def records: Chunk[ProducerRecord[K, V]] diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerResult.scala b/modules/core/src/main/scala/fs2/kafka/ProducerResult.scala index b6fcc67a8..7590880e7 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerResult.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerResult.scala @@ -25,7 +25,6 @@ import org.apache.kafka.clients.producer.RecordMetadata * Use [[ProducerResult#apply]] to create a new [[ProducerResult]]. */ sealed abstract class ProducerResult[+P, +K, +V] { - /** * The records produced along with respective metadata. * Can be empty for passthrough-only. @@ -41,7 +40,6 @@ object ProducerResult { override val records: Chunk[(ProducerRecord[K, V], RecordMetadata)], override val passthrough: P ) extends ProducerResult[P, K, V] { - override def toString: String = if (records.isEmpty) s"ProducerResult(, $passthrough)" diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala index 72a5523c3..ae352b8e8 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala @@ -27,7 +27,6 @@ import scala.concurrent.duration._ * Use `ProducerSettings#apply` to create a new instance. */ sealed abstract class ProducerSettings[F[_], K, V] { - /** * The `Serializer` to use for serializing record keys. */ diff --git a/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala b/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala index f2aa80f32..a4b6ffe57 100644 --- a/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala +++ b/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala @@ -15,7 +15,6 @@ import cats.{Applicative, Functor} * a creation effect. */ sealed abstract class RecordDeserializer[F[_], A] { - def forKey: F[Deserializer[F, A]] def forValue: F[Deserializer[F, A]] diff --git a/modules/core/src/main/scala/fs2/kafka/RecordSerializer.scala b/modules/core/src/main/scala/fs2/kafka/RecordSerializer.scala index 1b053a038..f7183841e 100644 --- a/modules/core/src/main/scala/fs2/kafka/RecordSerializer.scala +++ b/modules/core/src/main/scala/fs2/kafka/RecordSerializer.scala @@ -15,7 +15,6 @@ import cats.syntax.all._ * a creation effect. */ sealed abstract class RecordSerializer[F[_], A] { - def forKey: F[Serializer[F, A]] def forValue: F[Serializer[F, A]] diff --git a/modules/core/src/main/scala/fs2/kafka/Serializer.scala b/modules/core/src/main/scala/fs2/kafka/Serializer.scala index 062deb585..5ada7a4e3 100644 --- a/modules/core/src/main/scala/fs2/kafka/Serializer.scala +++ b/modules/core/src/main/scala/fs2/kafka/Serializer.scala @@ -17,7 +17,6 @@ import java.util.UUID * support for effect types. */ sealed abstract class Serializer[F[_], A] { - /** * Attempts to serialize the specified value of type `A` into * bytes. The Kafka topic name, to which the serialized bytes diff --git a/modules/core/src/main/scala/fs2/kafka/Timestamp.scala b/modules/core/src/main/scala/fs2/kafka/Timestamp.scala index a622d82ad..f679e7abb 100644 --- a/modules/core/src/main/scala/fs2/kafka/Timestamp.scala +++ b/modules/core/src/main/scala/fs2/kafka/Timestamp.scala @@ -18,7 +18,6 @@ import cats.instances.boolean._ * no timestamp at all. */ sealed abstract class Timestamp { - /** * Returns the timestamp value, if the timestamp is * representing the time when a record was created. @@ -51,7 +50,6 @@ sealed abstract class Timestamp { } object Timestamp { - /** * Creates a new [[Timestamp]] instance from the specified * timestamp value representing the time when the record diff --git a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala index cccb1bcdb..e848312f4 100644 --- a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala @@ -27,7 +27,6 @@ import scala.annotation.nowarn * arbitrary passthrough value to be included in the result. */ abstract class TransactionalKafkaProducer[F[_], K, V] { - /** * Produces the `ProducerRecord`s in the specified [[TransactionalProducerRecords]] * in four steps: first a transaction is initialized, then the records are placed @@ -42,13 +41,11 @@ abstract class TransactionalKafkaProducer[F[_], K, V] { } object TransactionalKafkaProducer { - /** * [[TransactionalKafkaProducer.Metrics]] extends [[TransactionalKafkaProducer]] to provide * access to the underlying producer metrics. */ abstract class Metrics[F[_], K, V] extends TransactionalKafkaProducer[F, K, V] { - /** * Returns producer metrics. * @@ -62,7 +59,6 @@ object TransactionalKafkaProducer { * to allow producing of records without corresponding upstream offsets. */ abstract class WithoutOffsets[F[_], K, V] extends Metrics[F, K, V] { - /** * Produces the `ProducerRecord`s in the specified [[ProducerRecords]] * in three steps: first a transaction is initialized, then the records are placed @@ -186,7 +182,6 @@ object TransactionalKafkaProducer { private[kafka] final class TransactionalProducerPartiallyApplied[F[_]](val dummy: Boolean = true) extends AnyVal { - /** * Alternative version of `resource` where the `F[_]` is * specified explicitly, and where the key and value type can diff --git a/modules/core/src/main/scala/fs2/kafka/TransactionalProducerRecords.scala b/modules/core/src/main/scala/fs2/kafka/TransactionalProducerRecords.scala index 93aae3419..8a918dce2 100644 --- a/modules/core/src/main/scala/fs2/kafka/TransactionalProducerRecords.scala +++ b/modules/core/src/main/scala/fs2/kafka/TransactionalProducerRecords.scala @@ -28,7 +28,6 @@ import fs2.kafka.internal.syntax._ * results and specified passthrough value. */ sealed abstract class TransactionalProducerRecords[F[_], +P, +K, +V] { - /** The records to produce and commit. Can be empty for passthrough-only. */ def records: Chunk[CommittableProducerRecords[F, K, V]] diff --git a/modules/core/src/main/scala/fs2/kafka/TransactionalProducerSettings.scala b/modules/core/src/main/scala/fs2/kafka/TransactionalProducerSettings.scala index d970e570f..cf17994d3 100644 --- a/modules/core/src/main/scala/fs2/kafka/TransactionalProducerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/TransactionalProducerSettings.scala @@ -21,7 +21,6 @@ import scala.concurrent.duration.FiniteDuration * Use [[TransactionalProducerSettings.apply]] to create a new instance. */ sealed abstract class TransactionalProducerSettings[F[_], K, V] { - /** * The transactional ID which should be used in transactions. * This is the value for the following producer property. diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaAssignment.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaAssignment.scala index 21695d9ea..dfe8f2124 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaAssignment.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaAssignment.scala @@ -13,7 +13,6 @@ import scala.collection.immutable.SortedSet import org.apache.kafka.common.TopicPartition trait KafkaAssignment[F[_]] { - /** * Returns the set of partitions currently assigned to this consumer. */ diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaCommit.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaCommit.scala index 4e6eef75b..0dc0844c4 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaCommit.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaCommit.scala @@ -10,7 +10,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition trait KafkaCommit[F[_]] { - /** * Commit the specified offsets for the specified list of topics and partitions to Kafka.
*
diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala index 8994c1799..13485d21f 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala @@ -11,7 +11,6 @@ import fs2.kafka.CommittableConsumerRecord import org.apache.kafka.common.TopicPartition trait KafkaConsume[F[_], K, V] { - /** * Consume from all assigned partitions, producing a stream * of [[CommittableConsumerRecord]]s. Alias for [[stream]]. diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsumerLifecycle.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsumerLifecycle.scala index 7dba47032..748cd83df 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsumerLifecycle.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsumerLifecycle.scala @@ -7,7 +7,6 @@ package fs2.kafka.consumer trait KafkaConsumerLifecycle[F[_]] { - /** * Whenever `terminate` is invoked, an attempt will be made to stop the * underlying consumer. The `terminate` operation will not wait for the diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaMetrics.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaMetrics.scala index 8316f685c..b619db7e1 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaMetrics.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaMetrics.scala @@ -9,7 +9,6 @@ package fs2.kafka.consumer import org.apache.kafka.common.{MetricName, Metric} trait KafkaMetrics[F[_]] { - /** * Returns consumer metrics. * diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaOffsets.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaOffsets.scala index a586c796d..598a8a71f 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaOffsets.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaOffsets.scala @@ -11,7 +11,6 @@ import cats.Foldable import scala.concurrent.duration.FiniteDuration trait KafkaOffsets[F[_]] { - /** * Overrides the fetch offsets that the consumer will use when reading the * next record. If this API is invoked for the same partition more than once, @@ -72,5 +71,4 @@ trait KafkaOffsets[F[_]] { * Returns the offset of the next record that will be fetched. */ def position(partition: TopicPartition, timeout: FiniteDuration): F[Long] - } diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaOffsetsV2.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaOffsetsV2.scala index 7488b817b..921dae39c 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaOffsetsV2.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaOffsetsV2.scala @@ -12,7 +12,6 @@ import org.apache.kafka.common.TopicPartition import scala.concurrent.duration.FiniteDuration trait KafkaOffsetsV2[F[_]] extends KafkaOffsets[F] { - /** * Returns the last committed offsets for the given partitions. */ diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaSubscription.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaSubscription.scala index e77de6434..7f4b7e446 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaSubscription.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaSubscription.scala @@ -12,7 +12,6 @@ import cats.data.NonEmptyList import scala.util.matching.Regex trait KafkaSubscription[F[_]] { - /** * Subscribes the consumer to the specified topics. Note that you have to * use one of the `subscribe` functions to subscribe to one or more topics @@ -47,5 +46,4 @@ trait KafkaSubscription[F[_]] { * by `subscribe` or `assign`. */ def unsubscribe: F[Unit] - } diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaTopics.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaTopics.scala index b31bfcc3f..6fd692390 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaTopics.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaTopics.scala @@ -11,7 +11,6 @@ import scala.concurrent.duration.FiniteDuration import org.apache.kafka.common.TopicPartition trait KafkaTopics[F[_]] { - /** * Returns the partitions for the specified topic. * diff --git a/modules/core/src/main/scala/fs2/kafka/internal/FakeFiber.scala b/modules/core/src/main/scala/fs2/kafka/internal/FakeFiber.scala index 7e3d907c9..0031640ec 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/FakeFiber.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/FakeFiber.scala @@ -17,7 +17,6 @@ private[kafka] final case class FakeFiber[F[_]](join: F[Unit], cancel: F[Unit])( implicit F: Concurrent[F] ) { def combine(that: FakeFiber[F]): FakeFiber[F] = { - val fa0join = this.join.guaranteeCase { case Outcome.Canceled() => F.unit diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 166a062d9..1ae3832b2 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -401,7 +401,6 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( case HandlePollResult.CompletedAndStored(completeFetches, completedLog, storedLog, _) => completeFetches >> logging.log(completedLog) >> logging.log(storedLog) }) >> result.pendingCommits.traverse_(_.commit) - } } ref.get.flatMap { state => diff --git a/modules/core/src/main/scala/fs2/kafka/internal/package.scala b/modules/core/src/main/scala/fs2/kafka/internal/package.scala index 0a4d999e0..705e9beed 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/package.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/package.scala @@ -8,5 +8,4 @@ package fs2.kafka package object internal { private[kafka] type ExclusiveAccess[F[_], A] = F[A] => F[A] - } diff --git a/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala b/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala index e63a96011..b3f6b67b0 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala @@ -121,7 +121,6 @@ private[kafka] object syntax { def updatedIfAbsent(k: K, v: => V): Map[K, V] = if (map.contains(k)) map else map.updated(k, v) - } implicit final class MapWrappedValueSyntax[F[_], K, V]( @@ -187,7 +186,6 @@ private[kafka] object syntax { implicit final class KafkaFutureSyntax[F[_], A]( private val futureF: F[KafkaFuture[A]] ) extends AnyVal { - // Inspired by Monix's `CancelableFuture#fromJavaCompletable`. def cancelable(implicit F: Async[F]): F[A] = F.async { (cb: (Either[Throwable, A] => Unit)) => diff --git a/modules/core/src/test/scala/cats/tests/CatsSuite.scala b/modules/core/src/test/scala/cats/tests/CatsSuite.scala index c6cf71602..9fbc862a0 100644 --- a/modules/core/src/test/scala/cats/tests/CatsSuite.scala +++ b/modules/core/src/test/scala/cats/tests/CatsSuite.scala @@ -18,7 +18,6 @@ import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks import org.typelevel.discipline.scalatest.FunSuiteDiscipline trait TestSettings extends Configuration with Matchers { - lazy val checkConfiguration: PropertyCheckConfiguration = PropertyCheckConfiguration( minSuccessful = if (Platform.isJvm) PosInt(50) else PosInt(5), @@ -58,7 +57,6 @@ trait CatsSuite with AllSyntaxBinCompat4 with AllSyntaxBinCompat5 with StrictCatsEquality { - implicit override val generatorDrivenConfig: PropertyCheckConfiguration = checkConfiguration diff --git a/modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala b/modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala index e6a0837eb..70ab7e10c 100644 --- a/modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala +++ b/modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala @@ -22,7 +22,6 @@ import fs2.Chunk import org.scalacheck.rng.Seed trait BaseGenerators { - implicit def chunkCogen[A: Cogen]: Cogen[Chunk[A]] = Cogen.it(_.iterator) val genTopic: Gen[String] = arbitrary[String] diff --git a/modules/core/src/test/scala/fs2/kafka/BaseKafkaSpec.scala b/modules/core/src/test/scala/fs2/kafka/BaseKafkaSpec.scala index 9f3d29e7f..3f22a9aa7 100644 --- a/modules/core/src/test/scala/fs2/kafka/BaseKafkaSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/BaseKafkaSpec.scala @@ -39,7 +39,6 @@ import org.scalatest.Args import org.testcontainers.utility.DockerImageName abstract class BaseKafkaSpec extends BaseAsyncSpec with ForAllTestContainer { - final val adminClientCloseTimeout: FiniteDuration = 2.seconds final val transactionTimeoutInterval: FiniteDuration = 1.second diff --git a/modules/core/src/test/scala/fs2/kafka/CommittableConsumerRecordLawsSpec.scala b/modules/core/src/test/scala/fs2/kafka/CommittableConsumerRecordLawsSpec.scala index 679d3c1a0..753278213 100644 --- a/modules/core/src/test/scala/fs2/kafka/CommittableConsumerRecordLawsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/CommittableConsumerRecordLawsSpec.scala @@ -11,7 +11,6 @@ import cats.kernel.laws.discipline.EqTests import cats.laws.discipline.{BitraverseTests, TraverseTests} class CommittableConsumerRecordLawsSpec extends BaseCatsSpec { - checkAll( "CommittableConsumerRecord.eqLaws", EqTests[CommittableConsumerRecord[IO, String, String]].eqv diff --git a/modules/core/src/test/scala/fs2/kafka/CommittableOffsetLawsSpec.scala b/modules/core/src/test/scala/fs2/kafka/CommittableOffsetLawsSpec.scala index fccb02c38..5bc0c865a 100644 --- a/modules/core/src/test/scala/fs2/kafka/CommittableOffsetLawsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/CommittableOffsetLawsSpec.scala @@ -10,10 +10,8 @@ import cats.effect.IO import cats.kernel.laws.discipline.EqTests class CommittableOffsetLawsSpec extends BaseCatsSpec { - checkAll( "CommittableOffset.eqLaws", EqTests[CommittableOffset[IO]].eqv ) - } diff --git a/modules/core/src/test/scala/fs2/kafka/CommittableOffsetSpec.scala b/modules/core/src/test/scala/fs2/kafka/CommittableOffsetSpec.scala index 13a497999..9083dd488 100644 --- a/modules/core/src/test/scala/fs2/kafka/CommittableOffsetSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/CommittableOffsetSpec.scala @@ -67,5 +67,4 @@ final class CommittableOffsetSpec extends BaseSpec { } } } - } diff --git a/modules/core/src/test/scala/fs2/kafka/CommittableProducerRecordsLawsSpec.scala b/modules/core/src/test/scala/fs2/kafka/CommittableProducerRecordsLawsSpec.scala index e6e1d4809..905bd9381 100644 --- a/modules/core/src/test/scala/fs2/kafka/CommittableProducerRecordsLawsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/CommittableProducerRecordsLawsSpec.scala @@ -11,7 +11,6 @@ import cats.kernel.laws.discipline.EqTests import cats.laws.discipline.{BitraverseTests, TraverseTests} class CommittableProducerRecordsLawsSpec extends BaseCatsSpec { - checkAll( "CommittableProducerRecord.eqLaws", EqTests[CommittableProducerRecords[IO, String, String]].eqv diff --git a/modules/core/src/test/scala/fs2/kafka/ConsumerRecordLawsSpec.scala b/modules/core/src/test/scala/fs2/kafka/ConsumerRecordLawsSpec.scala index e659bc53c..424294ad8 100644 --- a/modules/core/src/test/scala/fs2/kafka/ConsumerRecordLawsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/ConsumerRecordLawsSpec.scala @@ -10,7 +10,6 @@ import cats.kernel.laws.discipline.EqTests import cats.laws.discipline.{BitraverseTests, TraverseTests} class ConsumerRecordLawsSpec extends BaseCatsSpec { - checkAll( "ConsumerRecord.eqLaws", EqTests[ConsumerRecord[Int, Int]].eqv diff --git a/modules/core/src/test/scala/fs2/kafka/HeaderLawsSpec.scala b/modules/core/src/test/scala/fs2/kafka/HeaderLawsSpec.scala index 2b3c1e294..8b36f960d 100644 --- a/modules/core/src/test/scala/fs2/kafka/HeaderLawsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/HeaderLawsSpec.scala @@ -9,10 +9,8 @@ package fs2.kafka import cats.kernel.laws.discipline.EqTests class HeaderLawsSpec extends BaseCatsSpec { - checkAll( "Header.eqLaws", EqTests[Header].eqv ) - } diff --git a/modules/core/src/test/scala/fs2/kafka/HeadersLawsSpec.scala b/modules/core/src/test/scala/fs2/kafka/HeadersLawsSpec.scala index cebb188da..37ce4e951 100644 --- a/modules/core/src/test/scala/fs2/kafka/HeadersLawsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/HeadersLawsSpec.scala @@ -9,10 +9,8 @@ package fs2.kafka import cats.kernel.laws.discipline.EqTests class HeadersLawsSpec extends BaseCatsSpec { - checkAll( "Headers.eqLaws", EqTests[Headers].eqv ) - } diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala index 44419f56f..a6d78ab69 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala @@ -22,7 +22,6 @@ import org.apache.kafka.common.resource.{ } final class KafkaAdminClientSpec extends BaseKafkaSpec { - describe("KafkaAdminClient") { it("should allow creating instances") { KafkaAdminClient.resource[IO](adminClientSettings).use(IO.pure).unsafeRunSync() @@ -342,5 +341,4 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec { .lastOrError .unsafeRunSync() } - } diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala index 1f4e866a3..3ff710c4a 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala @@ -28,7 +28,6 @@ import scala.collection.immutable.SortedSet import scala.concurrent.duration._ final class KafkaConsumerSpec extends BaseKafkaSpec { - type Consumer = KafkaConsumer[IO, String, String] type ConsumerStream = Stream[IO, CommittableConsumerRecord[IO, String, String]] @@ -99,7 +98,6 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { // duplication is currently possible. res.distinct should contain theSameElementsAs produced - } } @@ -332,7 +330,6 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { "java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive" } } - } } diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala index c8dd1d187..b29752295 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala @@ -11,7 +11,6 @@ import cats.effect.unsafe.implicits.global import fs2.{Chunk, Stream} final class KafkaProducerSpec extends BaseKafkaSpec { - describe("creating producers") { it("should support defined syntax") { val settings = diff --git a/modules/core/src/test/scala/fs2/kafka/ProducerRecordLawsSpec.scala b/modules/core/src/test/scala/fs2/kafka/ProducerRecordLawsSpec.scala index 315f8233e..1733feff9 100644 --- a/modules/core/src/test/scala/fs2/kafka/ProducerRecordLawsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/ProducerRecordLawsSpec.scala @@ -10,7 +10,6 @@ import cats.kernel.laws.discipline.EqTests import cats.laws.discipline.{BitraverseTests, TraverseTests} class ProducerRecordLawsSpec extends BaseCatsSpec { - checkAll( "ProducerRecord.eqLaws", EqTests[ProducerRecord[Int, Int]].eqv diff --git a/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala b/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala index f7e907d46..62e76da1f 100644 --- a/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala @@ -176,7 +176,6 @@ final class ProducerSettingsSpec extends BaseSpec { ) } } - } val settings = ProducerSettings[IO, String, String] diff --git a/modules/core/src/test/scala/fs2/kafka/RecordDeserializerSpec.scala b/modules/core/src/test/scala/fs2/kafka/RecordDeserializerSpec.scala index bc2d2cbd1..c4a01eb63 100644 --- a/modules/core/src/test/scala/fs2/kafka/RecordDeserializerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/RecordDeserializerSpec.scala @@ -3,12 +3,10 @@ package fs2.kafka import cats.effect.IO class RecordDeserializerSpec extends BaseSpec { - import cats.effect.unsafe.implicits.global describe("RecordDeserializer#transform") { it("should transform the RecordDeserializer applying the function to inner Deserializers") { - val strRecordDes: RecordDeserializer[IO, String] = RecordDeserializer .const(IO.pure(Deserializer[IO, Int])) @@ -24,7 +22,6 @@ class RecordDeserializerSpec extends BaseSpec { it( "should transform the RecordDeserializer[F, T] to RecordDeserializer[F, Either[Throwable, T]]" ) { - val attemptIntRecordDes: RecordDeserializer[IO, Either[Throwable, Int]] = RecordDeserializer .const(IO.pure(Deserializer[IO, Int].flatMap[Int] { @@ -46,7 +43,6 @@ class RecordDeserializerSpec extends BaseSpec { describe("RecordDeserializer#option") { it("should transform the RecordDeserializer[F, T] to RecordDeserializer[F, Option[T]]") { - val optIntRecordDes: RecordDeserializer[IO, Option[Int]] = RecordDeserializer .const(IO.pure(Deserializer[IO, Int])) diff --git a/modules/core/src/test/scala/fs2/kafka/RecordSerializerSpec.scala b/modules/core/src/test/scala/fs2/kafka/RecordSerializerSpec.scala index 9511c83e7..36805995b 100644 --- a/modules/core/src/test/scala/fs2/kafka/RecordSerializerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/RecordSerializerSpec.scala @@ -3,12 +3,10 @@ package fs2.kafka import cats.effect.IO class RecordSerializerSpec extends BaseSpec { - import cats.effect.unsafe.implicits.global describe("RecordSerializer#transform") { it("should transform the RecordSerializer applying the function to inner Serializers") { - val intRecordSer: RecordSerializer[IO, Int] = RecordSerializer .const(IO.pure(Serializer[IO, String])) @@ -22,7 +20,6 @@ class RecordSerializerSpec extends BaseSpec { describe("RecordSerializer#option") { it("should transform the RecordSerializer[F, T] to RecordSerializer[F, Option[T]]") { - val optStrRecordSer: RecordSerializer[IO, Option[String]] = RecordSerializer .const(IO.pure(Serializer[IO, String])) diff --git a/modules/core/src/test/scala/fs2/kafka/TimestampLawsSpec.scala b/modules/core/src/test/scala/fs2/kafka/TimestampLawsSpec.scala index 9c98322cf..3d81eeddf 100644 --- a/modules/core/src/test/scala/fs2/kafka/TimestampLawsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/TimestampLawsSpec.scala @@ -9,10 +9,8 @@ package fs2.kafka import cats.kernel.laws.discipline.EqTests class TimestampLawsSpec extends BaseCatsSpec { - checkAll( "Timestamp.eqLaws", EqTests[Timestamp].eqv ) - } diff --git a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala index e140ea6c6..2a1af9922 100644 --- a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala @@ -23,7 +23,6 @@ import org.scalatest.EitherValues import scala.concurrent.duration._ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { - describe("creating transactional producers") { it("should support defined syntax") { val settings = TransactionalProducerSettings("id", ProducerSettings[IO, String, String]) @@ -101,7 +100,6 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { ) ) ) - } passthrough <- Stream .eval(records.fold(producer.produceWithoutOffsets, producer.produce)) @@ -493,5 +491,4 @@ class TransactionalKafkaProducerTimeoutSpec extends BaseKafkaSpec with EitherVal consumedOrError.isLeft shouldBe true } } - } diff --git a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala index 474b8e4b2..61d5c64c5 100644 --- a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala @@ -65,9 +65,7 @@ final class SyntaxSpec extends BaseSpec { } describe("KafkaFuture.cancelable") { - it("should cancel future when fiber is cancelled") { - @volatile var isFutureCancelled = false val test = @@ -91,6 +89,5 @@ final class SyntaxSpec extends BaseSpec { } yield () test.unsafeRunSync() } - } } diff --git a/modules/vulcan-testkit-munit/src/main/scala/fs2/kafka/vulcan/testkit/SchemaSuite.scala b/modules/vulcan-testkit-munit/src/main/scala/fs2/kafka/vulcan/testkit/SchemaSuite.scala index 5fe78ad2c..9722ab8ed 100644 --- a/modules/vulcan-testkit-munit/src/main/scala/fs2/kafka/vulcan/testkit/SchemaSuite.scala +++ b/modules/vulcan-testkit-munit/src/main/scala/fs2/kafka/vulcan/testkit/SchemaSuite.scala @@ -28,7 +28,6 @@ trait CompatibilityChecker[F[_]] { } trait SchemaSuite extends FunSuite { - private def codecAsSchema[A](codec: Codec[A]) = codec.schema.fold(e => fail(e.message), ok => ok) def compatibilityChecker( @@ -43,7 +42,6 @@ trait SchemaSuite extends FunSuite { checker = clientSettings.createSchemaRegistryClient .map { client => new CompatibilityChecker[IO] { - private def registrySchema(subject: String): IO[Schema] = for { metadata <- IO.delay(client.getLatestSchemaMetadata(subject)) @@ -75,7 +73,6 @@ trait SchemaSuite extends FunSuite { ) } } - } } .unsafeRunSync() diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala index 488d080de..451fa6893 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala @@ -32,7 +32,6 @@ final class AvroDeserializer[A] private[vulcan] ( s"Invalid Avro record: bytes is null or empty" ) ) - } else { val writerSchemaId = ByteBuffer.wrap(bytes).getInt(1) // skip magic byte diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala index 4fca097ae..51daf75bc 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala @@ -23,7 +23,6 @@ import vulcan.Codec * Use `AvroSettings.apply` to create an instance. */ sealed abstract class AvroSettings[F[_]] { - /** * The `SchemaRegistryClient` to use for the serializers * and deserializers created from this [[AvroSettings]]. diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientSettings.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientSettings.scala index 8e336a447..fd1684600 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientSettings.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/SchemaRegistryClientSettings.scala @@ -18,7 +18,6 @@ import fs2.kafka.internal.converters.collection._ * Use `SchemaRegistryClient#apply` to create an instance. */ sealed abstract class SchemaRegistryClientSettings[F[_]] { - /** * The base URL of the schema registry service. */ diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/package.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/package.scala index 4f546cfc4..70479f739 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/package.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/package.scala @@ -9,7 +9,6 @@ package fs2.kafka import _root_.vulcan.Codec package object vulcan { - /** Alias for `io.confluent.kafka.schemaregistry.client.SchemaRegistryClient`. */ type SchemaRegistryClient = io.confluent.kafka.schemaregistry.client.SchemaRegistryClient diff --git a/scalafix/rules/src/main/scala/fix/Fs2Kafka.scala b/scalafix/rules/src/main/scala/fix/Fs2Kafka.scala index 0b721e26d..c59c6edd9 100644 --- a/scalafix/rules/src/main/scala/fix/Fs2Kafka.scala +++ b/scalafix/rules/src/main/scala/fix/Fs2Kafka.scala @@ -4,12 +4,10 @@ import scalafix.v1._ import scala.meta._ class Fs2Kafka extends SemanticRule("Fs2Kafka") { - override def fix(implicit doc: SemanticDocument): Patch = reorderPassthroughParams def reorderPassthroughParams(implicit doc: SemanticDocument): Patch = { - val ProducerRecords_M = SymbolMatcher.normalized( "fs2/kafka/ProducerRecords.", @@ -66,7 +64,6 @@ class Fs2Kafka extends SemanticRule("Fs2Kafka") { List(f, k, v, p) ) => Patch.replaceTree(term, s"${fun.syntax}[$f, $p, $k, $v]") - }.asPatch } } From 7b5a38b1300aff847c871cb20f8e62096f017bac Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 13 Apr 2023 14:43:47 +0000 Subject: [PATCH 09/19] Add 'Reformat with scalafmt 2.2.2' to .git-blame-ignore-revs --- .git-blame-ignore-revs | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .git-blame-ignore-revs diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 000000000..4ef4b43f6 --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,2 @@ +# Scala Steward: Reformat with scalafmt 2.2.2 +db0ea0ffd3d0956211a680941426f73ba7ec581b From cefbadff7ddf80c5e42c06280d0f31ab884eb3ed Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 27 Apr 2023 15:05:03 +0000 Subject: [PATCH 10/19] Update logback-classic to 1.3.7 in series/2.x --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index acb521413..0210acf41 100644 --- a/build.sbt +++ b/build.sbt @@ -108,7 +108,7 @@ lazy val dependencySettings = Seq( "org.typelevel" %% "discipline-scalatest" % "2.2.0", "org.typelevel" %% "cats-effect-laws" % catsEffectVersion, "org.typelevel" %% "cats-effect-testkit" % catsEffectVersion, - "ch.qos.logback" % "logback-classic" % "1.3.6" + "ch.qos.logback" % "logback-classic" % "1.3.7" ).map(_ % Test), libraryDependencies ++= { if (scalaVersion.value.startsWith("3")) Nil From 548d6336c82698052caf5f40b449c8d78e7cfe37 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 27 Apr 2023 15:05:27 +0000 Subject: [PATCH 11/19] Update testcontainers-scala-kafka, ... to 0.40.15 in series/2.x --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index acb521413..c88887082 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ val fs2Version = "3.6.1" val kafkaVersion = "2.8.2" -val testcontainersScalaVersion = "0.40.14" +val testcontainersScalaVersion = "0.40.15" val vulcanVersion = "1.9.0" From 6f97721ade6ddcd1b074636a3f97410e098141cb Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 27 Apr 2023 15:05:53 +0000 Subject: [PATCH 12/19] Update cats-effect, cats-effect-laws, ... to 3.4.9 in series/2.x --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index acb521413..a90e83d72 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,4 @@ -val catsEffectVersion = "3.4.8" +val catsEffectVersion = "3.4.9" val confluentVersion = "6.2.9" From 510b56ea40bf69d6f0d4af2e891feb5ae5421c88 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 27 Apr 2023 15:06:09 +0000 Subject: [PATCH 13/19] Update sbt-typelevel to 0.4.20 in series/2.x --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 345db90be..8d417c1d6 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,4 +4,4 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.1") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.9.0") addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.3.7") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0") -addSbtPlugin("org.typelevel" % "sbt-typelevel" % "0.4.19") +addSbtPlugin("org.typelevel" % "sbt-typelevel" % "0.4.20") From 7a3c304f2d16a767d821c45e2610365128f9a3b9 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Fri, 5 May 2023 13:20:45 +0100 Subject: [PATCH 14/19] Update tlBaseVersion --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index acb521413..3f157d95e 100644 --- a/build.sbt +++ b/build.sbt @@ -18,7 +18,7 @@ val scala213 = "2.13.10" val scala3 = "3.2.2" -ThisBuild / tlBaseVersion := "2.5" +ThisBuild / tlBaseVersion := "2.6" ThisBuild / tlVersionIntroduced := Map("3" -> "2.1.0") From 996a64c5169b7033672a3214dc250b1c09af2166 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Fri, 5 May 2023 13:59:07 +0100 Subject: [PATCH 15/19] Replace Async#fromFuture with inlined Async#fromFutureCancelable --- .../src/main/scala/fs2/kafka/KafkaProducer.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala index 07ee6d0f8..59f627cfa 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala @@ -218,7 +218,17 @@ object KafkaProducer { else promise.failure(exception) } ) - }.as(F.fromFuture(F.delay(promise.future))) + }.as { + F.delay(promise.future).flatMap { fut => + F.executionContext.flatMap { implicit ec => + F.async[(ProducerRecord[K, V], RecordMetadata)] { cb => + F.delay(fut.onComplete(t => cb(t.toEither))).as(Some(F.unit)) + } + } + } + // TODO: replace the above with the following once CE3.5.0 is out + // F.fromFutureCancelable(F.delay(promise.future)) + } } } From 3271bbd3dc5b416f3428bcbad1de2d3fcc50a8bc Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Fri, 5 May 2023 15:05:32 +0100 Subject: [PATCH 16/19] Update workflow --- .github/workflows/ci.yml | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ef098e193..4ebb4faf5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -172,3 +172,37 @@ jobs: SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} PGP_SECRET: ${{ secrets.PGP_SECRET }} run: sbt '++ ${{ matrix.scala }}' tlRelease docs/docusaurusPublishGhpages + + validate-steward: + name: Validate Steward Config + strategy: + matrix: + os: [ubuntu-latest] + scala: [2.13.6] + java: [temurin@11] + runs-on: ${{ matrix.os }} + steps: + - name: Checkout current branch (fast) + uses: actions/checkout@v3 + + - name: Download Java (temurin@17) + id: download-java-temurin-17 + if: matrix.java == 'temurin@17' + uses: typelevel/download-java@v2 + with: + distribution: temurin + java-version: 17 + + - name: Setup Java (temurin@17) + if: matrix.java == 'temurin@17' + uses: actions/setup-java@v3 + with: + distribution: jdkfile + java-version: 17 + jdkFile: ${{ steps.download-java-temurin-17.outputs.jdkFile }} + + - uses: coursier/setup-action@v1 + with: + apps: scala-steward + + - run: scala-steward validate-repo-config .scala-steward.conf From ab2a5054f372ceef5c4a6577978f2c1898394c6c Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Fri, 5 May 2023 16:07:11 +0100 Subject: [PATCH 17/19] Fix merge issue --- .../src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index b98b991ce..30e51ae1b 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -103,7 +103,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( private[this] def manualCommitSync(request: Request.ManualCommitSync[F]): F[Unit] = { val commit = - withConsumer.blocking(_.commitSync(request.offsets.asJava, settings.commitTimeout.asJava)) + withConsumer.blocking(_.commitSync(request.offsets.asJava, settings.commitTimeout.toJava)) commit.attempt >>= request.callback } From 436b00b474eb924ce42aa64b10ad099e7ca2b1ad Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Fri, 5 May 2023 16:09:13 +0100 Subject: [PATCH 18/19] Scalafmt --- .../core/src/main/scala/fs2/kafka/KafkaAdminClient.scala | 1 - .../src/main/scala/fs2/kafka/KafkaProducerConnection.scala | 1 - modules/core/src/main/scala/fs2/kafka/Serializer.scala | 1 - modules/core/src/main/scala/fs2/kafka/package.scala | 6 ------ .../core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala | 1 - .../src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala | 1 - .../src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala | 1 - .../src/test/scala/fs2/kafka/vulcan/PackageSpec.scala | 1 - 8 files changed, 13 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala b/modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala index 0ba9d4947..01020ee29 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala @@ -330,7 +330,6 @@ object KafkaAdminClient { partitions: G[TopicPartition] ): ListConsumerGroupOffsetsForPartitions[F] = new ListConsumerGroupOffsetsForPartitions[F] { - private[this] val groupOffsets = Map( groupId -> new ListConsumerGroupOffsetsSpec().topicPartitions(partitions.asJava) ).asJava diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala index 3483134e7..0147a3357 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala @@ -145,7 +145,6 @@ object KafkaProducerConnection { override def partitionsFor(topic: String): G[List[PartitionInfo]] = withProducer.blocking { _.partitionsFor(topic).asScala.toList } - } } diff --git a/modules/core/src/main/scala/fs2/kafka/Serializer.scala b/modules/core/src/main/scala/fs2/kafka/Serializer.scala index 9ebdd5d07..2c2a55e42 100644 --- a/modules/core/src/main/scala/fs2/kafka/Serializer.scala +++ b/modules/core/src/main/scala/fs2/kafka/Serializer.scala @@ -52,7 +52,6 @@ sealed abstract class GenericSerializer[-T <: KeyOrValue, F[_], A] { * support for effect types. */ object GenericSerializer { - def apply[F[_], A](implicit serializer: Serializer[F, A]): Serializer[F, A] = serializer /** Alias for [[Serializer#identity]]. */ diff --git a/modules/core/src/main/scala/fs2/kafka/package.scala b/modules/core/src/main/scala/fs2/kafka/package.scala index 9d66bcd4d..6f5999b8e 100644 --- a/modules/core/src/main/scala/fs2/kafka/package.scala +++ b/modules/core/src/main/scala/fs2/kafka/package.scala @@ -80,7 +80,6 @@ package object kafka { } package kafka { - /** Phantom types to indicate whether a [[Serializer]]/[[Deserializer]] if for keys, values, or both */ sealed trait KeyOrValue @@ -88,11 +87,9 @@ package kafka { sealed trait Value extends KeyOrValue } package kafka { - import cats.Foldable object ProducerRecords { - def apply[F[+_], K, V]( records: F[ProducerRecord[K, V]] )( @@ -101,11 +98,9 @@ package kafka { def one[K, V](record: ProducerRecord[K, V]): ProducerRecords[K, V] = Chunk.singleton(record) - } object TransactionalProducerRecords { - @deprecated("this is now an identity operation", "3.0.0-M5") def apply[F[_], K, V]( chunk: Chunk[CommittableProducerRecords[F, K, V]] @@ -119,6 +114,5 @@ package kafka { record: CommittableProducerRecords[F, K, V] ): TransactionalProducerRecords[F, K, V] = Chunk.singleton(record) - } } diff --git a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala index e835e1547..dc0ac58ad 100644 --- a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala @@ -19,7 +19,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl import java.util.concurrent.CancellationException final class SyntaxSpec extends BaseSpec { - describe("Map#filterKeysStrictValuesList") { it("should be the same as toList.collect") { forAll { (m: Map[Int, Int], p: Int => Boolean) => diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala index 1289803f5..ec1eb3e1d 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala @@ -16,7 +16,6 @@ import java.nio.ByteBuffer final class AvroDeserializer[A] private[vulcan] ( private val codec: Codec[A] ) extends AnyVal { - def forKey[F[_]: Sync](settings: AvroSettings[F]): Resource[F, KeyDeserializer[F, A]] = createDeserializer(isKey = true, settings) diff --git a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala index 9bcfe4282..89b9c2482 100644 --- a/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala +++ b/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSerializer.scala @@ -14,7 +14,6 @@ import cats.effect.kernel.Resource final class AvroSerializer[A] private[vulcan] ( private val codec: Codec[A] ) extends AnyVal { - def forKey[F[_]: Sync](settings: AvroSettings[F]): Resource[F, KeySerializer[F, A]] = create(isKey = true, settings) diff --git a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala index 6d31e2f23..4c07d9faf 100644 --- a/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala +++ b/modules/vulcan/src/test/scala/fs2/kafka/vulcan/PackageSpec.scala @@ -59,7 +59,6 @@ final class PackageSpec extends AnyFunSpec { case (serializer, deserializer) => val test2 = Test2("test", 42) for { - serialized <- serializer.serialize("topic2", Headers.empty, test2) deserialized <- deserializer.deserialize("topic2", Headers.empty, serialized) } yield assert(deserialized == Test("test")) From f5fa6dcd1f66b05a254878a00fce7dd2102bd0b6 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Fri, 5 May 2023 16:28:06 +0100 Subject: [PATCH 19/19] Mima exclusion --- build.sbt | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/build.sbt b/build.sbt index b25b00b58..a8921bf72 100644 --- a/build.sbt +++ b/build.sbt @@ -263,6 +263,15 @@ lazy val publishSettings = ) ) +ThisBuild / mimaBinaryIssueFilters ++= { + import com.typesafe.tools.mima.core._ + // format: off + Seq( + ProblemFilters.exclude[Problem]("fs2.kafka.internal.*") + ) + // format: on +} + lazy val noMimaSettings = Seq(mimaPreviousArtifacts := Set()) lazy val noPublishSettings =