-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batched kombu consumer #483
base: develop
Are you sure you want to change the base?
Conversation
…sumer implementations
@pacejackson I've made the requested changes. The pump worker is now handling the batching. However, I had to introduce these rather ugly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to the comments about using separate classes, I think we can simplify things a bit further by putting the actual batch management into it's own object rather than having it be in the pump worker itself:
class BatchedQueue:
def __init__(self, work_queue: Queue, batch_size: int, flush_interval: timedelta):
assert batch_size >= 1
assert work_queue is not None
self._work_queue = work_queue
self._batch = []
self._batch_size = batch_size
self._flush_interval = flush_interval
self._flusher = None
def put(self, data):
self._batch.append(data)
if len(self._batch) >= self._batch_size:
self._flush()
else:
self._queue_flush()
def _flush(self):
self._flusher.stop()
self._flusher = None
batch = copy(self._batch)
self._batch = []
self._work_queue.put(batch)
def _queue_flush(self):
if self._flusher != None:
return
self._flusher = DelayedWorker(delay=self._flush_interval, fn=self._flush)
class DelayedWorker:
def __init__(self, delay: timedelta, fn: Callable):
self._delay = delay
self._fn = fn
self._stopped = False
self._thread = threading.Thread(target=self._run)
self._thread.run()
def stop(self):
self._stopped = True
def _run():
time.sleep(self._delay)
if self._stopped:
return
self._fn()
def __del__(self):
self.stop()
So then the pump consumer is just put
-ing the message into a BatchedQueue
rather than directly into a queue.Queue
, you might even be able to make it easy to pass in one or the other to the same PumpQueue
class constructor?
Note, this also would remove the ThreadedScheduler
class and the dependency on schedule
, and replaces the Timer
class with a DelayedWorker
object.
@pacejackson Thank you for your recommendations. I've adapted them slightly to be generic and to merged ideas of the DelayedWorker and Timer. I do think that either the pump worker or the queue consumer should requeue batched messages in this design. Since we didn't want to modify the QueueConsumer class, the PumpWorker seems like the only good option to me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking really good! There's still a few things to fix but I think it's just about there (or at least to the point where you can feel comfortable writing tests).
Sorry for taking 7 months to add the requested changes to this PR ;-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small things around naming of arguments
Co-authored-by: Andrew Boyle <pacejackson@users.noreply.github.com>
Co-authored-by: Andrew Boyle <pacejackson@users.noreply.github.com>
Co-authored-by: Andrew Boyle <pacejackson@users.noreply.github.com>
Co-authored-by: Andrew Boyle <pacejackson@users.noreply.github.com>
@spladug Ready to merge ♻️ |
@gfmio it looks like it's failing lint (non-lazy string formatting in a logger call) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for the lint failure
@spladug The linter and tests are now all green 💚 |
logger.warning( | ||
"%s %s %s %s %s", | ||
"KombuBatchConsumerWorker is stopping and requeuing messages in the", | ||
"unprocessed batch. A message in this unprocessed batch had already", | ||
"been acknowledged, so it is not being requeued. However, this indicates", | ||
"an inconsistency in the batch processing logic which should be", | ||
"investigated.", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's going on here? I'm guessing it was complaining about concatenating the strings with +? It should be possible to concatenate them without this formatting stuff by just putting them next to eachother with no operators:
logger.warning( | |
"%s %s %s %s %s", | |
"KombuBatchConsumerWorker is stopping and requeuing messages in the", | |
"unprocessed batch. A message in this unprocessed batch had already", | |
"been acknowledged, so it is not being requeued. However, this indicates", | |
"an inconsistency in the batch processing logic which should be", | |
"investigated.", | |
) | |
logger.warning( | |
"KombuBatchConsumerWorker is stopping and requeuing messages in the " | |
"unprocessed batch. A message in this unprocessed batch had already " | |
"been acknowledged, so it is not being requeued. However, this indicates " | |
"an inconsistency in the batch processing logic which should be " | |
"investigated." | |
) |
This PR adds support for batch processing of messages in kombu consumer servers.
A typical use case for this would be a queue consumer that has better performance characteristics when performing batched processing, for example if performing a batch of database mutations in a single transaction is significantly faster than performing individual writes.
The
KombuQueueConsumerFactory
now accepts additional parameters to configure the batch processing. If thebatch_size
isNone
(default case), messages are consumed individually, so the behaviour is the same as before. If thebatch_size
is set, the messages will be processed in batches.The kombu pump worker class
KombuConsumerWorker
accepts the same parameters and performs the actual batching. Ifbatch_size
is set, it stores new messages in a batch and processes them once thebatch_size
is reached or a timeout occurs.The PR also adds a
KombuBatchMessageHandler
, which handles message sequences rather than individual messages. TheKombuQueueConsumerFactory
will return the correct type of message handler depending on whetherbatch_size
is set.The timeout in the
KombuBatchQueueConsumer
is implemented using a newTimer
helper class which internally uses the libraryschedule
to schedule the work. To prevent blocking, the default scheduler is extended to run on a separate thread.The
Timer
andKombuConsumerWorker
have safeguards in place for requeueing unprocessed messages in the event of a server shutdown or errors during batch processing.