Skip to content

Commit

Permalink
Fix after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed May 24, 2023
1 parent ac7f61b commit b0a5660
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package zio.kafka

import zio.Chunk
import zio.logging.backend.SLF4J
import zio.test.{ TestAspect, TestAspectAtLeastR, TestEnvironment, ZIOSpecDefault }
import zio.test.{TestAspect, TestAspectAtLeastR, TestEnvironment, ZIOSpecAbstract, ZIOSpecDefault}

/**
* Use this class instead of `ZIOSpecDefault` if you want your tests to use SLF4J to log.
*
* Useful when you want to use logback to configure your logger, for example.
*/
abstract class ZIOSpecDefaultSlf4j extends ZIOSpecDefault {
trait ZIOSpecDefaultSlf4j extends ZIOSpecDefault with ZIOSpecAbstractSlf4j

override def aspects: Chunk[TestAspectAtLeastR[TestEnvironment]] =
/**
* Mix this into your ZIO Spec if you want your tests to use SLF4J to log.
*/
trait ZIOSpecAbstractSlf4j extends ZIOSpecAbstract {

abstract override def aspects: Chunk[TestAspectAtLeastR[Environment with TestEnvironment]] =
super.aspects :+ TestAspect.fromLayer(zio.Runtime.removeDefaultLoggers >>> SLF4J.slf4j)

}
Original file line number Diff line number Diff line change
@@ -1,33 +1,28 @@
package zio.kafka.consumer

import io.github.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.consumer.{
ConsumerConfig,
ConsumerPartitionAssignor,
CooperativeStickyAssignor,
RangeAssignor
}
import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerPartitionAssignor, CooperativeStickyAssignor, RangeAssignor}
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.KafkaTestUtils._
import zio.kafka.ZIOKafkaSpec
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.embedded.Kafka
import zio.kafka.producer.{ Producer, TransactionalProducer }
import zio.kafka.{ZIOSpecAbstractSlf4j}
import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval}
import zio.kafka.consumer.diagnostics.{DiagnosticEvent, Diagnostics}
import zio.kafka.testkit.{Kafka, KafkaRandom, ZIOSpecWithKafka}
import zio.kafka.producer.{Producer, TransactionalProducer}
import zio.kafka.serde.Serde
import zio.stream.{ ZSink, ZStream }
import zio.kafka.testkit.KafkaTestUtils._
import zio.stream.{ZSink, ZStream}
import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import scala.reflect.ClassTag

object RebalanceSafeCommitConsumerSpec extends ZIOKafkaSpec {
object RebalanceSafeCommitConsumerSpec extends ZIOSpecWithKafka with KafkaRandom with ZIOSpecAbstractSlf4j {
override val kafkaPrefix: String = "commitsafeconsumespec"

override def spec: Spec[TestEnvironment & Kafka, Throwable] =
override def spec: Spec[TestEnvironment with Kafka, Throwable] =
suite("Rebalance safe commit consumer streaming")(
test("plainStream emits messages for a topic subscription") {
val kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i"))
Expand Down Expand Up @@ -1064,8 +1059,7 @@ object RebalanceSafeCommitConsumerSpec extends ZIOKafkaSpec {
} yield assertCompletes
}
).provideSomeLayerShared[TestEnvironment & Kafka](
producer ++ Scope.default ++ Runtime.removeDefaultLoggers ++ Runtime.addLogger(logger)
) @@ withLiveClock @@ TestAspect.sequential @@ timeout(180.seconds)
producer ++ Scope.default) @@ withLiveClock @@ TestAspect.sequential @@ timeout(180.seconds)

private def scheduledProducer[R](
topic: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private[consumer] final class Runloop private (
.toMap
val offsetsWithMetaData = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e))
val onSuccess = cont(Exit.unit) <* diagnostics.emitIfEnabled(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
for {
Expand All @@ -289,7 +289,7 @@ private[consumer] final class Runloop private (
_ <- commandQueue.offer(CommitAvailable)
} yield ()
case err: Throwable =>
cont(Exit.fail(err)) <* diagnostics.emitIfEnabled(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err))
cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err))
}
val callback =
new OffsetCommitCallback {
Expand Down Expand Up @@ -437,7 +437,6 @@ private[consumer] final class Runloop private (
pollResult <-
consumer.runloopAccess { c =>
ZIO.suspend {
val prevAssigned = c.assignment().asScala.toSet
val requestedPartitions = state.pendingRequests.map(_.tp).toSet

resumeAndPausePartitions(c, requestedPartitions, state.assignedStreams)
Expand Down

0 comments on commit b0a5660

Please sign in to comment.