From d69a51f08adaa4d9948c09474c45111f733bae2b Mon Sep 17 00:00:00 2001 From: Cameron Dunn Date: Tue, 27 Aug 2024 10:20:15 -0700 Subject: [PATCH] Pass context to task failure/retry funcs (#31) --- v1/worker.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/v1/worker.go b/v1/worker.go index 819eb09..74eb0de 100644 --- a/v1/worker.go +++ b/v1/worker.go @@ -1,6 +1,7 @@ package machinery import ( + "context" "errors" "fmt" "net/url" @@ -36,9 +37,9 @@ type Worker struct { // errorHandler triggers on ALL errors, including boot, consume, etc. errorHandler func(err error) // taskErrorHandler only triggers when a task returns an error (that is not a retry) - taskErrorHandler func(err error, signature *tasks.Signature) + taskErrorHandler func(ctx context.Context, err error, signature *tasks.Signature) // taskRetryErrorHandler triggers when a task returns an error that forces a retry - taskRetryErrorHandler func(err error, signature *tasks.Signature) + taskRetryErrorHandler func(ctx context.Context, err error, signature *tasks.Signature) preTaskHandler func(*tasks.Signature) postTaskHandler func(*tasks.Signature) preConsumeHandler func(*Worker) bool @@ -181,7 +182,7 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten // if this failed, it means the task is malformed, probably has invalid // signature, go directly to task failed without checking whether to retry if err != nil { - worker.taskFailed(signature, err) + worker.taskFailed(context.Background(), signature, err) return err } @@ -213,7 +214,7 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten keepAndRetryErr, ok := interface{}(err).(tasks.ErrKeepAndRetryTaskLater) if ok { if worker.taskRetryErrorHandler != nil { - worker.taskRetryErrorHandler(err, signature) + worker.taskRetryErrorHandler(task.Context, err, signature) } return worker.keepAndRetryTaskIn(signature, keepAndRetryErr.RetryIn()) } @@ -221,7 +222,7 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten // Allow tasks to use errs.ErrStopTaskDeletion if errors.Is(err, errs.ErrStopTaskDeletion) { if worker.taskRetryErrorHandler != nil { - worker.taskRetryErrorHandler(err, signature) + worker.taskRetryErrorHandler(task.Context, err, signature) } return err } @@ -230,12 +231,12 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten // and signature.RetryTimeout values if signature.RetryCount > 0 { if worker.taskRetryErrorHandler != nil { - worker.taskRetryErrorHandler(err, signature) + worker.taskRetryErrorHandler(task.Context, err, signature) } return worker.taskRetry(signature) } - return worker.taskFailed(signature, err) + return worker.taskFailed(task.Context, signature, err) } return worker.taskSucceeded(signature, results) @@ -420,14 +421,14 @@ func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*t } // taskFailed updates the task state and triggers error callbacks -func (worker *Worker) taskFailed(signature *tasks.Signature, taskErr error) error { +func (worker *Worker) taskFailed(ctx context.Context, signature *tasks.Signature, taskErr error) error { // Update task state to FAILURE if err := worker.server.GetBackend().SetStateFailure(signature, taskErr.Error()); err != nil { return fmt.Errorf("Set state to 'failure' for task %s returned error: %s", signature.UUID, err) } if worker.taskErrorHandler != nil { - worker.taskErrorHandler(taskErr, signature) + worker.taskErrorHandler(ctx, taskErr, signature) } else if worker.errorHandler != nil { worker.errorHandler(taskErr) } else { @@ -467,12 +468,12 @@ func (worker *Worker) SetErrorHandler(handler func(err error)) { // SetTaskErrorHandler sets a custom error handler for task-only errors // Note: taskErrorHandler will override errorHandler for task errors when set. -func (worker *Worker) SetTaskErrorHandler(handler func(err error, signature *tasks.Signature)) { +func (worker *Worker) SetTaskErrorHandler(handler func(ctx context.Context, err error, signature *tasks.Signature)) { worker.taskErrorHandler = handler } // SetTaskRetryErrorHandler sets a custom error handler for errors that are being retried. -func (worker *Worker) SetTaskRetryErrorHandler(handler func(err error, signature *tasks.Signature)) { +func (worker *Worker) SetTaskRetryErrorHandler(handler func(ctx context.Context, err error, signature *tasks.Signature)) { worker.taskRetryErrorHandler = handler }