-
Notifications
You must be signed in to change notification settings - Fork 3
/
test.py
29 lines (20 loc) · 878 Bytes
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from quixstreams import Application
from quixstreams import State, message_key
import os
import time
import uuid
# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()
# Define your application and settings
app = Application(consumer_group="slack-embeddings-v1.9",auto_offset_reset="earliest")
# Define an input topic with JSON deserializer
input_topic = app.topic(os.environ['input'], value_deserializer="json")
# Define an output topic with JSON serializer
output_topic = app.topic(os.environ['output'], value_serializer="json")
# Initialize a streaming dataframe based on the stream of messages from the input topic:
sdf = app.dataframe(topic=input_topic)
sdf = sdf[sdf["root"]["channel"] == "SDK"]
# Publish the processed SDF to a Kafka topic specified by the output_topic object.
sdf = sdf.to_topic(output_topic)
app.run(sdf)