Skip to content

Commit

Permalink
Merge pull request #1198 from fd4s/bplommer/merge-2.x
Browse files Browse the repository at this point in the history
Merge 2.x into 3.x
  • Loading branch information
bplommer authored May 5, 2023
2 parents b31279d + f5fa6dc commit e4376d1
Show file tree
Hide file tree
Showing 68 changed files with 32 additions and 131 deletions.
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Scala Steward: Reformat with scalafmt 2.2.2
db0ea0ffd3d0956211a680941426f73ba7ec581b
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "2.2.1"
version = "2.2.2"
style = default
maxColumn = 100
project.git = true
Expand Down
15 changes: 12 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
val catsEffectVersion = "3.4.8"
val catsEffectVersion = "3.4.9"

val catsVersion = "2.6.1"

Expand All @@ -8,7 +8,7 @@ val fs2Version = "3.6.1"

val kafkaVersion = "3.4.0"

val testcontainersScalaVersion = "0.40.14"
val testcontainersScalaVersion = "0.40.15"

val vulcanVersion = "1.9.0"

Expand Down Expand Up @@ -108,7 +108,7 @@ lazy val dependencySettings = Seq(
"org.typelevel" %% "discipline-scalatest" % "2.2.0",
"org.typelevel" %% "cats-effect-laws" % catsEffectVersion,
"org.typelevel" %% "cats-effect-testkit" % catsEffectVersion,
"ch.qos.logback" % "logback-classic" % "1.3.6"
"ch.qos.logback" % "logback-classic" % "1.3.7"
).map(_ % Test),
libraryDependencies ++= {
if (scalaVersion.value.startsWith("3")) Nil
Expand Down Expand Up @@ -263,6 +263,15 @@ lazy val publishSettings =
)
)

ThisBuild / mimaBinaryIssueFilters ++= {
import com.typesafe.tools.mima.core._
// format: off
Seq(
ProblemFilters.exclude[Problem]("fs2.kafka.internal.*")
)
// format: on
}

lazy val noMimaSettings = Seq(mimaPreviousArtifacts := Set())

lazy val noPublishSettings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.concurrent.duration._
* then apply any desired modifications on top of that instance.
*/
sealed abstract class AdminClientSettings {

/**
* Properties which can be provided when creating a Java `KafkaAdminClient`
* instance. Numerous functions in [[AdminClientSettings]] add properties
Expand Down Expand Up @@ -183,7 +182,6 @@ object AdminClientSettings {
override val properties: Map[String, String],
override val closeTimeout: FiniteDuration
) extends AdminClientSettings {

override def withBootstrapServers(bootstrapServers: String): AdminClientSettings =
withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)

Expand Down
2 changes: 0 additions & 2 deletions modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import scala.concurrent.duration._
* set it with [[ConsumerSettings#withCommitRecovery]].
*/
abstract class CommitRecovery {

/**
* Describes recovery from offset commit exceptions. The `commit`
* parameter can be used to retry the commit. Note that if more
Expand All @@ -50,7 +49,6 @@ abstract class CommitRecovery {
}

object CommitRecovery {

/**
* The default [[CommitRecovery]] used in [[ConsumerSettings]] unless
* a different one has been specified. The default recovery strategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ sealed abstract class CommitRecoveryException(
})

object CommitRecoveryException {

/**
* Creates a new [[CommitRecoveryException]] indicating that offset
* commit recovery was attempted `attempts` times for `offsets` but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import cats.{Applicative, Bitraverse, Eq, Eval, Show, Traverse}
* can be used to create a new instance.
*/
sealed abstract class CommittableConsumerRecord[F[_], +K, +V] {

/**
* The Kafka record for the [[CommittableConsumerRecord]]. If you
* are not committing offsets to Kafka, simply use this to get the
Expand Down
2 changes: 0 additions & 2 deletions modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.kafka.common.TopicPartition
* used to create a new instance.
*/
sealed abstract class CommittableOffset[F[_]] {

/**
* The topic and partition for which [[offsetAndMetadata]]
* can be committed using [[commit]].
Expand Down Expand Up @@ -79,7 +78,6 @@ sealed abstract class CommittableOffset[F[_]] {
}

object CommittableOffset {

/**
* Creates a new [[CommittableOffset]] with the specified `topicPartition`
* and `offsetAndMetadata`, along with `commit`, describing how to commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.kafka.common.TopicPartition
* also achieve better performance.
*/
sealed abstract class CommittableOffsetBatch[F[_]] {

/**
* Creates a new [[CommittableOffsetBatch]] with the specified offset
* included. Note that this function requires offsets to be in-order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import fs2.kafka.internal.syntax._
* the same transaction as the offset is committed.
*/
sealed abstract class CommittableProducerRecords[F[_], +K, +V] {

/** The records to produce. Can be empty to simply commit the offset. */
def records: Chunk[ProducerRecord[K, V]]

Expand Down
1 change: 0 additions & 1 deletion modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.kafka.common.record.TimestampType.{CREATE_TIME, LOG_APPEND_TIM
* To create a new instance, use [[ConsumerRecord#apply]]
*/
sealed abstract class ConsumerRecord[+K, +V] {

/** The topic from which the record has been consumed. */
def topic: String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import scala.concurrent.duration._
* Use `ConsumerSettings#apply` to create a new instance.
*/
sealed abstract class ConsumerSettings[F[_], K, V] {

/**
* The `Deserializer` to use for deserializing record keys.
*/
Expand Down
1 change: 0 additions & 1 deletion modules/core/src/main/scala/fs2/kafka/Deserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import java.util.UUID
* support for effect types.
*/
sealed abstract class GenericDeserializer[-T <: KeyOrValue, F[_], A] {

/**
* Attempts to deserialize the specified bytes into a value of
* type `A`. The Kafka topic name, from which the serialized
Expand Down
1 change: 0 additions & 1 deletion modules/core/src/main/scala/fs2/kafka/Header.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import cats.instances.string._
* To create a new [[Header]], use [[Header#apply]].
*/
sealed abstract class Header extends org.apache.kafka.common.header.Header {

/** The header key. */
override def key: String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import scala.annotation.tailrec
* Kafka `Deserializer` interface.
*/
sealed abstract class HeaderDeserializer[A] {

/**
* Deserializes the header value bytes into a value of type `A`.
*/
Expand Down Expand Up @@ -88,7 +87,6 @@ sealed abstract class HeaderDeserializer[A] {
}

object HeaderDeserializer {

/** Alias for `HeaderDeserializer[Either[Throwable, A]]`. */
type Attempt[A] = HeaderDeserializer[Either[Throwable, A]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.util.UUID
* the Kafka `Serializer` interface.
*/
sealed abstract class HeaderSerializer[A] {

/**
* Serializes the specified value of type `A` into bytes.
*/
Expand Down
1 change: 0 additions & 1 deletion modules/core/src/main/scala/fs2/kafka/Headers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import fs2.kafka.internal.syntax._
* instance of [[Header]] using `append`.
*/
sealed abstract class Headers {

/**
* Returns the first header with the specified key,
* wrapped in `Some`, or `None` if no such header
Expand Down
11 changes: 0 additions & 11 deletions modules/core/src/main/scala/fs2/kafka/KafkaAdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import scala.annotation.nowarn
* Use [[KafkaAdminClient.resource]] or [[KafkaAdminClient.stream]] to create an instance.
*/
sealed abstract class KafkaAdminClient[F[_]] {

/**
* Updates the configuration for the specified resources.
*/
Expand Down Expand Up @@ -216,7 +215,6 @@ sealed abstract class KafkaAdminClient[F[_]] {
}

object KafkaAdminClient {

private[this] def alterConfigsWith[F[_]: Functor, G[_]: Foldable](
withAdminClient: WithAdminClient[F],
configs: Map[ConfigResource, G[AlterConfigOp]]
Expand Down Expand Up @@ -266,7 +264,6 @@ object KafkaAdminClient {
withAdminClient(_.deleteAcls(filters.asJava).all).void

sealed abstract class DescribeCluster[F[_]] {

/** Lists available nodes in the cluster. */
def nodes: F[Set[Node]]

Expand Down Expand Up @@ -323,7 +320,6 @@ object KafkaAdminClient {
withAdminClient(_.describeAcls(filter).values()).map(_.toList)

sealed abstract class ListConsumerGroupOffsetsForPartitions[F[_]] {

/** Lists consumer group offsets on specified partitions for the consumer group. */
def partitionsToOffsetAndMetadata: F[Map[TopicPartition, OffsetAndMetadata]]
}
Expand All @@ -334,7 +330,6 @@ object KafkaAdminClient {
partitions: G[TopicPartition]
): ListConsumerGroupOffsetsForPartitions[F] =
new ListConsumerGroupOffsetsForPartitions[F] {

private[this] val groupOffsets = Map(
groupId -> new ListConsumerGroupOffsetsSpec().topicPartitions(partitions.asJava)
).asJava
Expand All @@ -344,15 +339,13 @@ object KafkaAdminClient {
adminClient
.listConsumerGroupOffsets(groupOffsets)
.partitionsToOffsetAndMetadata

}.map(_.toMap)

override def toString: String =
s"ListConsumerGroupOffsetsForPartitions(groupId = $groupId, partitions = $partitions)"
}

sealed abstract class ListConsumerGroupOffsets[F[_]] {

/** Lists consumer group offsets for the consumer group. */
def partitionsToOffsetAndMetadata: F[Map[TopicPartition, OffsetAndMetadata]]

Expand Down Expand Up @@ -384,7 +377,6 @@ object KafkaAdminClient {
}

sealed abstract class ListConsumerGroups[F[_]] {

/** Lists the available consumer group ids. */
def groupIds: F[List[String]]

Expand All @@ -407,7 +399,6 @@ object KafkaAdminClient {
}

sealed abstract class ListTopicsIncludeInternal[F[_]] {

/** Lists topic names. Includes internal topics. */
def names: F[Set[String]]

Expand Down Expand Up @@ -439,7 +430,6 @@ object KafkaAdminClient {
}

sealed abstract class ListTopics[F[_]] {

/** Lists topic names. */
def names: F[Set[String]]

Expand Down Expand Up @@ -520,7 +510,6 @@ object KafkaAdminClient {

private def create[F[_]: Functor](client: WithAdminClient[F]) =
new KafkaAdminClient[F] {

override def alterConfigs[G[_]](configs: Map[ConfigResource, G[AlterConfigOp]])(
implicit G: Foldable[G]
): F[Unit] =
Expand Down
5 changes: 0 additions & 5 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ object KafkaConsumer {
stopConsumingDeferred: Deferred[F, Unit]
)(implicit F: Async[F], logging: Logging[F]): KafkaConsumer[F, K, V] =
new KafkaConsumer[F, K, V] {

override def partitionsMapStream
: Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]] = {
val chunkQueue: F[Queue[F, Option[Chunk[CommittableConsumerRecord[F, K, V]]]]] =
Expand Down Expand Up @@ -394,7 +393,6 @@ object KafkaConsumer {
actor.ref.updateAndGet(_.withOnRebalance(on).asStreaming).flatTap { newState =>
logging.log(LogEntry.StoredOnRebalance(on, newState))
}

}
.ensure(NotSubscribedException())(_.subscribed) >>
withConsumer.blocking(_.assignment.toSortedSet)
Expand Down Expand Up @@ -547,7 +545,6 @@ object KafkaConsumer {
} >> actor.ref
.updateAndGet(_.asSubscribed)
.log(LogEntry.ManuallyAssignedPartitions(partitions, _))

}

override def assign(topic: String): F[Unit] =
Expand Down Expand Up @@ -688,7 +685,6 @@ object KafkaConsumer {

private[kafka] final class ConsumerPartiallyApplied[F[_]](val dummy: Boolean = true)
extends AnyVal {

/**
* Alternative version of `resource` where the `F[_]` is
* specified explicitly, and where the key and value type can
Expand Down Expand Up @@ -730,7 +726,6 @@ object KafkaConsumer {
* to explicitly use operations such as `flatMap` and `evalTap`
*/
implicit final class StreamOps[F[_]: Functor, K, V](self: Stream[F, KafkaConsumer[F, K, V]]) {

/**
* Subscribes a consumer to the specified topics within the [[Stream]] context.
* See [[KafkaSubscription#subscribe]].
Expand Down
19 changes: 11 additions & 8 deletions modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.concurrent.Promise
* ability to produce `ProducerRecord`s using [[produce]].
*/
abstract class KafkaProducer[F[_], K, V] {

/**
* Produces the specified [[ProducerRecords]] in two steps: the
* first effect puts the records in the buffer of the producer,
Expand Down Expand Up @@ -53,10 +52,8 @@ abstract class KafkaProducer[F[_], K, V] {
}

object KafkaProducer {

implicit class ProducerOps[F[_], K, V](private val producer: KafkaProducer[F, K, V])
extends AnyVal {

/**
* Produce a single [[ProducerRecord]], see [[KafkaProducer.produce]] for general semantics.
*/
Expand Down Expand Up @@ -88,15 +85,13 @@ object KafkaProducer {
*/
def produceOne(record: ProducerRecord[K, V]): F[F[ProducerResult[K, V]]] =
producer.produce(ProducerRecords.one(record))

}

/**
* [[KafkaProducer.Metrics]] extends [[KafkaProducer]] to provide
* access to the underlying producer metrics.
*/
abstract class Metrics[F[_], K, V] extends KafkaProducer[F, K, V] {

/**
* Returns producer metrics.
*
Expand All @@ -110,7 +105,6 @@ object KafkaProducer {
* access to the underlying producer partitions.
*/
abstract class PartitionsFor[F[_], K, V] extends KafkaProducer.Metrics[F, K, V] {

/**
* Returns partition metadata for the given topic.
*
Expand Down Expand Up @@ -203,7 +197,17 @@ object KafkaProducer {
else promise.failure(exception)
}
)
}.as(F.fromFuture(F.delay(promise.future)))
}.as {
F.delay(promise.future).flatMap { fut =>
F.executionContext.flatMap { implicit ec =>
F.async[(ProducerRecord[K, V], RecordMetadata)] { cb =>
F.delay(fut.onComplete(t => cb(t.toEither))).as(Some(F.unit))
}
}
}
// TODO: replace the above with the following once CE3.5.0 is out
// F.fromFutureCancelable(F.delay(promise.future))
}
}
}

Expand Down Expand Up @@ -263,7 +267,6 @@ object KafkaProducer {

private[kafka] final class ProducerPartiallyApplied[F[_]](val dummy: Boolean = true)
extends AnyVal {

/**
* Alternative version of `resource` where the `F[_]` is
* specified explicitly, and where the key and value type can
Expand Down
Loading

0 comments on commit e4376d1

Please sign in to comment.