Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer polling interval is multiplied by 2 #734

Open
Yevhensh opened this issue Oct 23, 2021 · 0 comments
Open

Kafka consumer polling interval is multiplied by 2 #734

Yevhensh opened this issue Oct 23, 2021 · 0 comments
Assignees

Comments

@Yevhensh
Copy link

Yevhensh commented Oct 23, 2021

When specifying Kafka consumer settings (fs2.kafka.ConsumerSettings) I can see that poll interval is multiplied by 2,
so for now my workaround for consuming requests once per 10 seconds looks like that:

.withPollInterval(10 seconds / 2)

Full reproduce:

import cats.effect._
import cats.implicits._
import fs2.kafka._

import java.time.LocalTime
import scala.concurrent.duration._
import scala.util.Random

object Test extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val bootstrapServer: String      = "localhost:9092"
    val topic: String                = "test-topic"
    val pollInterval: FiniteDuration = 3.seconds

    for {
      _ <- produceRecords(bootstrapServer, topic)
      _ <- consumeRecords(bootstrapServer, topic, pollInterval)
    } yield ExitCode.Success
  }

  private def produceRecords(bootstrapServer: String, topic: String): IO[ProducerResult[String, String, Unit]] = {
    val producerSettings = ProducerSettings[IO, String, String].withBootstrapServers(bootstrapServer)

    KafkaProducer
      .resource(producerSettings)
      .use { producer =>
        List
          .fill(10)(IO(Random.alphanumeric.take(5).mkString))
          .sequence
          .map(values => values.map(ProducerRecord(topic, "key", _)))
          .flatMap { records =>
            producer
              .produce(ProducerRecords(records))
              .flatten
          }
      }
  }

  private def consumeRecords(bootstrapServer: String, topic: String, pollInterval: FiniteDuration): IO[Unit] = {
    val consumerSettings =
      ConsumerSettings[IO, String, String]
        .withBootstrapServers(bootstrapServer)
        .withGroupId("test-group")
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withPollInterval(pollInterval)
        .withMaxPollRecords(1)

    KafkaConsumer
      .resource(consumerSettings)
      .use { consumer =>
        consumer.subscribeTo(topic) >>
          consumer.stream
            .evalMap(msg => IO(println(s"${LocalTime.now()} - ${msg.record.value}")))
            .compile
            .drain
      }
  }

}

Output:

13:41:13.222 - F5Qdq
13:41:19.181 - FP5ol
13:41:25.184 - QMnmK
13:41:31.190 - ps4Ai
13:41:37.195 - LkzYg
13:41:43.207 - q5sA1
13:41:49.208 - PdK9W
13:41:55.217 - F28jw
13:42:01.223 - STNzB
13:42:07.227 - 1sKtc

Versions:

  "com.github.fd4s" %% "fs2-kafka"         % "1.8.0"
  "com.github.fd4s" %% "fs2-kafka-vulcan"  % "1.8.0"
  "com.github.fd4s" %% "vulcan"            % "1.7.1"
@aartigao aartigao self-assigned this Feb 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants