-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclassic_api.py
53 lines (42 loc) · 1.54 KB
/
classic_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
from typing import AsyncIterator, Tuple
from simple_aiokafka import (
ConsumerRecord,
SimpleConsumer,
SimpleProcessor,
SimpleProducer,
)
def process_message(msg: ConsumerRecord):
return msg.key, f"{msg.value}: Hello Kafka :)"
async def generate_message() -> AsyncIterator[Tuple[str, str]]:
n = 0
while True:
yield n, f"Message {n}"
n += 1
await asyncio.sleep(1)
async def main():
# generate_message -> dummy_topic
producer = SimpleProducer()
# Modify AIOKafkaProducer key_serializer
producer.conf.producer.key_serializer = lambda x: str(x).encode()
await producer.init("producer_topic")
asyncio.create_task(producer.produce(generate_message()))
# dummy_topic -> SimpleProcessor -> process_message -> dummy_output_topic
processor = SimpleProcessor()
processor.conf.consumer.key_deserializer = lambda x: int(bytes.decode(x))
processor.conf.producer.key_serializer = lambda x: str(x * 10).encode()
await processor.init(
input_topic="producer_topic",
output_topic="processor_topic",
)
asyncio.create_task(processor.process(process_message))
# dummy_output_topic -> print
consumer = SimpleConsumer()
consumer.conf.consumer.key_deserializer = lambda x: int(bytes.decode(x))
consumer.conf.consumer.group_id = "MyGroup"
await consumer.init("processor_topic")
async for msg in consumer.consumer:
print(msg)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())