You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem? Please describe.
I would like to build an application that consumes from two different topics.
It's kind of a syncing problem:
I have one producer early on in the pipeline that creates a "Batch" message to a topic which includes
a batch start and end time together with a batch id.
When the pipeline is processed (mainly data sanitization) the last application produces another message to another topic
which indicates "batch processed" (including the batch id).
Now I would like to store the Batch info into a state and do some processing when this "batch processed" event arrives in the other topic.
This essentially means that I have to use a Consumer rather the data_frame since I need to subscribe to multiple topics.
However I can't find an easy way with quixstreams of using States without a DataFrame.
Do you have a clean solution to this problem? I tried to reuse code from the _as_stateful() function from the Application class, but this is quite cumbersume due to the process_context that is required.
Is this an anti pattern? I'm pretty new to kafka streams and I'm learning along the way.
Thanks for the help!
The text was updated successfully, but these errors were encountered:
QuixStreams 3.0 was just released and it allows you to consume from multiple topics at once with a StreamingDataFrame, so take a look and let us know if this solves your issue or not, as it sounds like it might!
Is your feature request related to a problem? Please describe.
I would like to build an application that consumes from two different topics.
It's kind of a syncing problem:
I have one producer early on in the pipeline that creates a "Batch" message to a topic which includes
a batch start and end time together with a batch id.
When the pipeline is processed (mainly data sanitization) the last application produces another message to another topic
which indicates "batch processed" (including the batch id).
Now I would like to store the Batch info into a state and do some processing when this "batch processed" event arrives in the other topic.
This essentially means that I have to use a Consumer rather the
data_frame
since I need to subscribe to multiple topics.However I can't find an easy way with quixstreams of using States without a DataFrame.
Do you have a clean solution to this problem? I tried to reuse code from the
_as_stateful()
function from theApplication
class, but this is quite cumbersume due to theprocess_context
that is required.Is this an anti pattern? I'm pretty new to kafka streams and I'm learning along the way.
Thanks for the help!
The text was updated successfully, but these errors were encountered: