forked from nomisRev/kotlin-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example-readme-01.kt
72 lines (65 loc) · 2.47 KB
/
example-readme-01.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package example.exampleReadme01
import arrow.continuations.SuspendApp
import io.github.nomisRev.kafka.Admin
import io.github.nomisRev.kafka.AdminSettings
import io.github.nomisRev.kafka.createTopic
import io.github.nomisRev.kafka.imap
import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.publisher.Acks
import io.github.nomisRev.kafka.publisher.KafkaPublisher
import io.github.nomisRev.kafka.publisher.PublisherSettings
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.IntegerDeserializer
import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.UUID
@JvmInline
value class Key(val index: Int)
@JvmInline
value class Message(val content: String)
fun main(): Unit = SuspendApp {
val topicName = "test-topic"
val msgCount = 10
val kafka = Kafka.container
Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
client.createTopic(NewTopic(topicName, 1, 1))
}
launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
val settings: PublisherSettings<Key, Message> = PublisherSettings(
kafka.bootstrapServers,
IntegerSerializer().imap { key: Key -> key.index },
StringSerializer().imap { msg: Message -> msg.content },
Acks.All
)
KafkaPublisher(settings).use { publisher ->
publisher.publishScope {
(1..msgCount).forEach { index ->
offer(ProducerRecord(topicName, Key(index), Message("msg: $index")))
}
}
}
}
launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
kafka.bootstrapServers,
IntegerDeserializer().map(::Key),
StringDeserializer().map(::Message),
groupId = UUID.randomUUID().toString(),
autoOffsetReset = AutoOffsetReset.Earliest
)
KafkaReceiver(settings)
.receive(topicName)
.take(msgCount)
.map { "${it.key()} -> ${it.value()}" }
.collect(::println)
}
}