Skip to content

Commit

Permalink
Use timeout when waiting for queues to empty
Browse files Browse the repository at this point in the history
  • Loading branch information
kislyuk committed Apr 20, 2024
1 parent 2d5f570 commit ebf368f
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions watchtower/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class CloudWatchLogHandler(logging.Handler):

END = 1
FLUSH = 2
FLUSH_TIMEOUT = 30

# extra size of meta information with each messages
EXTRA_MSG_PAYLOAD_SIZE = 26
Expand Down Expand Up @@ -479,21 +480,22 @@ def flush(self):
"""
Send any queued messages to CloudWatch. This method does nothing if ``use_queues`` is set to False.
"""
# fixme: don't add filter if it's already installed
# FIXME: don't add filter if it's already installed
self.addFilter(_boto_filter)
if self.shutting_down:
return
for q in self.queues.values():
q.put(self.FLUSH)
for q in self.queues.values():
q.join()
with q.all_tasks_done:
q.all_tasks_done.wait_for(lambda: q.unfinished_tasks == 0, timeout=self.FLUSH_TIMEOUT)

def close(self):
"""
Send any queued messages to CloudWatch and prevent further processing of messages.
This method does nothing if ``use_queues`` is set to False.
"""
# fixme: don't add filter if it's already installed
# FIXME: don't add filter if it's already installed
self.addFilter(_boto_filter)
# Avoid waiting on the queue again when the close called twice.
# Otherwise the second call, as no thread is running, it will hang
Expand All @@ -504,7 +506,10 @@ def close(self):
for q in self.queues.values():
q.put(self.END)
for q in self.queues.values():
q.join()
with q.all_tasks_done:
q.all_tasks_done.wait_for(lambda: q.unfinished_tasks == 0, timeout=self.FLUSH_TIMEOUT)
if not q.empty():
warnings.warn("Timed out while delivering logs", WatchtowerWarning)
super().close()

def __repr__(self):
Expand Down

0 comments on commit ebf368f

Please sign in to comment.