-
hi ! i am trying to build video processing app. what i want to do is , first process videos and convert them to 480p variant and if there is no more messages in current queue , then start to process 720p variants. here is my current code 480p consumer
720p consumer
this is my publisher
this is my results for 480p consumer from terminal: Processed 480p for video: 101
Processing 480p for video: 102
Processed 480p for video: 102
Processing 480p for video: 103
480p queue empty, enqueueing for 720p processing: 102
Processed 480p for video: 103
480p queue empty, enqueueing for 720p processing: 103 it looks like there is a problem about asynchronicity |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 23 replies
-
No, it appears that everything is working as intended. How do you define "empty" when a queue might have more messages published to it? When video 102 is done processing, the message for 103 must not have been in the queue yet. I don't know how Your publisher should send a DONE message when it is guaranteed that no more messages will be published. |
Beta Was this translation helpful? Give feedback.
-
This is a consumer coordination problem, how your consumers depend on each other or interact with each other is not something RabbitMQ is aware of or should be aware of. What your applications should do when they are done with a certain stage is to publish a message of a special type to a separate queue. In general, in a pipeline of N different stages using N queues is guaranteed to be easier to reason about than one MegaQueue. Single Giant Shared Queue™ is likely the most problematic and the most infamous anti-pattern in messaging and the RabbitMQ community specifically. Not that when a design problem does not fit the queue consumption-acknowledgement model, there's nothing wrong with using a stream, for which there are now tutorials. Just an idea. |
Beta Was this translation helpful? Give feedback.
-
Also note that there is a old book that just as relevant today on messaging patterns. |
Beta Was this translation helpful? Give feedback.
i used locking mechanism and used redis for this. thank you for your time. but i didn't delete basic.get variant just in case.