-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_ingestion.py
26 lines (22 loc) · 918 Bytes
/
kafka_ingestion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from kafka import KafkaProducer, KafkaConsumer
import json
class KafkaIngestion:
def __init__(self, brokers, topic):
self.brokers = brokers
self.topic = topic
def produce(self, message):
producer = KafkaProducer(bootstrap_servers=self.brokers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send(self.topic, message)
producer.flush()
def consume(self):
consumer = KafkaConsumer(self.topic,
bootstrap_servers=self.brokers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
for message in consumer:
yield message.value
# Example Usage
# ingestion = KafkaIngestion(brokers='localhost:9092', topic='test-topic')
# ingestion.produce({"key": "value"})
# for message in ingestion.consume():
# print(message)