Skip to content

Commit

Permalink
Pass context to task failure/retry funcs (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
cameron-dunn-sublime authored Aug 27, 2024
1 parent 6eedd0e commit d69a51f
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions v1/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package machinery

import (
"context"
"errors"
"fmt"
"net/url"
Expand Down Expand Up @@ -36,9 +37,9 @@ type Worker struct {
// errorHandler triggers on ALL errors, including boot, consume, etc.
errorHandler func(err error)
// taskErrorHandler only triggers when a task returns an error (that is not a retry)
taskErrorHandler func(err error, signature *tasks.Signature)
taskErrorHandler func(ctx context.Context, err error, signature *tasks.Signature)
// taskRetryErrorHandler triggers when a task returns an error that forces a retry
taskRetryErrorHandler func(err error, signature *tasks.Signature)
taskRetryErrorHandler func(ctx context.Context, err error, signature *tasks.Signature)
preTaskHandler func(*tasks.Signature)
postTaskHandler func(*tasks.Signature)
preConsumeHandler func(*Worker) bool
Expand Down Expand Up @@ -181,7 +182,7 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten
// if this failed, it means the task is malformed, probably has invalid
// signature, go directly to task failed without checking whether to retry
if err != nil {
worker.taskFailed(signature, err)
worker.taskFailed(context.Background(), signature, err)
return err
}

Expand Down Expand Up @@ -213,15 +214,15 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten
keepAndRetryErr, ok := interface{}(err).(tasks.ErrKeepAndRetryTaskLater)
if ok {
if worker.taskRetryErrorHandler != nil {
worker.taskRetryErrorHandler(err, signature)
worker.taskRetryErrorHandler(task.Context, err, signature)
}
return worker.keepAndRetryTaskIn(signature, keepAndRetryErr.RetryIn())
}

// Allow tasks to use errs.ErrStopTaskDeletion
if errors.Is(err, errs.ErrStopTaskDeletion) {
if worker.taskRetryErrorHandler != nil {
worker.taskRetryErrorHandler(err, signature)
worker.taskRetryErrorHandler(task.Context, err, signature)
}
return err
}
Expand All @@ -230,12 +231,12 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten
// and signature.RetryTimeout values
if signature.RetryCount > 0 {
if worker.taskRetryErrorHandler != nil {
worker.taskRetryErrorHandler(err, signature)
worker.taskRetryErrorHandler(task.Context, err, signature)
}
return worker.taskRetry(signature)
}

return worker.taskFailed(signature, err)
return worker.taskFailed(task.Context, signature, err)
}

return worker.taskSucceeded(signature, results)
Expand Down Expand Up @@ -420,14 +421,14 @@ func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*t
}

// taskFailed updates the task state and triggers error callbacks
func (worker *Worker) taskFailed(signature *tasks.Signature, taskErr error) error {
func (worker *Worker) taskFailed(ctx context.Context, signature *tasks.Signature, taskErr error) error {
// Update task state to FAILURE
if err := worker.server.GetBackend().SetStateFailure(signature, taskErr.Error()); err != nil {
return fmt.Errorf("Set state to 'failure' for task %s returned error: %s", signature.UUID, err)
}

if worker.taskErrorHandler != nil {
worker.taskErrorHandler(taskErr, signature)
worker.taskErrorHandler(ctx, taskErr, signature)
} else if worker.errorHandler != nil {
worker.errorHandler(taskErr)
} else {
Expand Down Expand Up @@ -467,12 +468,12 @@ func (worker *Worker) SetErrorHandler(handler func(err error)) {

// SetTaskErrorHandler sets a custom error handler for task-only errors
// Note: taskErrorHandler will override errorHandler for task errors when set.
func (worker *Worker) SetTaskErrorHandler(handler func(err error, signature *tasks.Signature)) {
func (worker *Worker) SetTaskErrorHandler(handler func(ctx context.Context, err error, signature *tasks.Signature)) {
worker.taskErrorHandler = handler
}

// SetTaskRetryErrorHandler sets a custom error handler for errors that are being retried.
func (worker *Worker) SetTaskRetryErrorHandler(handler func(err error, signature *tasks.Signature)) {
func (worker *Worker) SetTaskRetryErrorHandler(handler func(ctx context.Context, err error, signature *tasks.Signature)) {
worker.taskRetryErrorHandler = handler
}

Expand Down

0 comments on commit d69a51f

Please sign in to comment.