Skip to content

Commit

Permalink
Add callback for retry exhaustion (#630)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbowring authored Sep 9, 2024
1 parent 4851506 commit f592ca7
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,4 @@ of those changes to CLEARTYPE SRL.
| [@nhairs](https://github.com/nhairs) | Nicholas Hairs |
| [@5tefan](https://github.com/5tefan/) | Stefan Codrescu |
| [@kuba-lilz](https://github.com/kuba-lilz/) | Jakub Kolodziejczyk |
| [@dbowring](https://github.com/dbowring/) | Daniel Bowring |
15 changes: 15 additions & 0 deletions dramatiq/middleware/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class Retries(Middleware):
predicate that can be used to programmatically determine
whether a task should be retried or not. This takes
precedence over `max_retries` when set.
on_retry_exhausted(str): Name of an actor to send a message to when
message is failed due to retries being exceeded.
"""

def __init__(self, *, max_retries=20, min_backoff=None, max_backoff=None, retry_when=None):
Expand All @@ -79,6 +81,7 @@ def actor_options(self):
"max_backoff",
"retry_when",
"throws",
"on_retry_exhausted",
}

def after_process_message(self, broker, message, *, result=None, exception=None):
Expand All @@ -104,6 +107,18 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
retry_when is None and max_retries is not None and retries >= max_retries:
self.logger.warning("Retries exceeded for message %r.", message.message_id)
message.fail()

target_actor_name = message.options.get("on_retry_exhausted") or actor.options.get("on_retry_exhausted")
if target_actor_name:
target_actor = broker.get_actor(target_actor_name)
target_actor.send(
message.asdict(),
{
"retries": retries,
"max_retries": max_retries,
},
)

return

if isinstance(exception, Retry) and exception.delay is not None:
Expand Down
77 changes: 77 additions & 0 deletions tests/middleware/test_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,80 @@ def do_work():

# And that all requeue timestamps are larger than message timestamp
assert all(requeue_time > message.message_timestamp for requeue_time in requeue_timestamps)


def test_on_retry_exhausted_is_sent(stub_broker, stub_worker):
attempted_at = []
called_at = []
max_retries = 2

@dramatiq.actor
def handle_retries_exhausted(message_data, retry_info):
called_at.append(time.monotonic())

@dramatiq.actor(max_retries=max_retries, on_retry_exhausted=handle_retries_exhausted.actor_name)
def do_work():
attempted_at.append(time.monotonic())
# Always request a retry
raise Retry(delay=1)

do_work.send()

stub_broker.join(do_work.queue_name)
stub_broker.join(handle_retries_exhausted.queue_name)
stub_worker.join()

# We should have the initial attempt + max_retries
assert len(attempted_at) == max_retries + 1
# And the exhausted handler should have been called.
assert len(called_at) == 1

def test_on_retry_exhausted_is_not_sent_for_success(stub_broker, stub_worker):
attempted_at = []
called_at = []
max_retries = 2

@dramatiq.actor
def handle_retries_exhausted(message_data, retry_info):
called_at.append(time.monotonic())

@dramatiq.actor(max_retries=max_retries, on_retry_exhausted=handle_retries_exhausted.actor_name)
def do_work():
attempted_at.append(time.monotonic())

do_work.send()

stub_broker.join(do_work.queue_name)
stub_broker.join(handle_retries_exhausted.queue_name)
stub_worker.join()

# No retry should be required
assert len(attempted_at) == 1
# And the exhausted callback should have never been called
assert len(called_at) == 0

def test_on_retry_exhausted_is_not_sent_for_eventual_success(stub_broker, stub_worker):
attempted_at = []
called_at = []
max_retries = 2

@dramatiq.actor
def handle_retries_exhausted(message_data, retry_info):
called_at.append(time.monotonic())

@dramatiq.actor(max_retries=max_retries, on_retry_exhausted=handle_retries_exhausted.actor_name)
def do_work():
attempted_at.append(time.monotonic())
if len(attempted_at) < 2:
raise Retry(delay=1)

do_work.send()

stub_broker.join(do_work.queue_name)
stub_broker.join(handle_retries_exhausted.queue_name)
stub_worker.join()

# The first retry should have succeeded
assert len(attempted_at) == 2
# So the exhausted callback should have never been called
assert len(called_at) == 0

0 comments on commit f592ca7

Please sign in to comment.