Skip to content

Commit

Permalink
Add a handler for retry errors (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
cameron-dunn-sublime authored Aug 10, 2023
1 parent 8617130 commit 504fbae
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions v1/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ type Worker struct {
Queue string
// errorHandler triggers on ALL errors, including boot, consume, etc.
errorHandler func(err error)
// taskErrorHandler only triggers when a task returns an error
taskErrorHandler func(err error, signature *tasks.Signature)
preTaskHandler func(*tasks.Signature)
postTaskHandler func(*tasks.Signature)
preConsumeHandler func(*Worker) bool
// taskErrorHandler only triggers when a task returns an error (that is not a retry)
taskErrorHandler func(err error, signature *tasks.Signature)
// taskRetryErrorHandler triggers when a task returns an error that forces a retry
taskRetryErrorHandler func(err error, signature *tasks.Signature)
preTaskHandler func(*tasks.Signature)
postTaskHandler func(*tasks.Signature)
preConsumeHandler func(*Worker) bool
}

var (
Expand Down Expand Up @@ -212,22 +214,26 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten
// retry the task after specified duration
retryTaskLaterErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
if ok {
worker.taskRetryErrorHandler(err, signature)
return worker.retryTaskIn(signature, retryTaskLaterErr.RetryIn())
}

keepAndRetryErr, ok := interface{}(err).(tasks.ErrKeepAndRetryTaskLater)
if ok {
worker.taskRetryErrorHandler(err, signature)
return worker.keepAndRetryTaskIn(signature, keepAndRetryErr.RetryIn())
}

// Allow tasks to use errs.ErrStopTaskDeletion
if errors.Is(err, errs.ErrStopTaskDeletion) {
worker.taskRetryErrorHandler(err, signature)
return err
}

// Otherwise, execute default retry logic based on signature.RetryCount
// and signature.RetryTimeout values
if signature.RetryCount > 0 {
worker.taskRetryErrorHandler(err, signature)
return worker.taskRetry(signature)
}

Expand Down Expand Up @@ -470,27 +476,31 @@ func (worker *Worker) SetTaskErrorHandler(handler func(err error, signature *tas
worker.taskErrorHandler = handler
}

//SetPreTaskHandler sets a custom handler func before a job is started
// SetTaskRetryErrorHandler sets a custom error handler for errors that are being retried.
func (worker *Worker) SetTaskRetryErrorHandler(handler func(err error, signature *tasks.Signature)) {
worker.taskRetryErrorHandler = handler
}

// SetPreTaskHandler sets a custom handler func before a job is started
func (worker *Worker) SetPreTaskHandler(handler func(*tasks.Signature)) {
worker.preTaskHandler = handler
}

//SetPostTaskHandler sets a custom handler for the end of a job
// SetPostTaskHandler sets a custom handler for the end of a job
func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature)) {
worker.postTaskHandler = handler
}

//SetPreConsumeHandler sets a custom handler for the end of a job
// SetPreConsumeHandler sets a custom handler for the end of a job
func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) {
worker.preConsumeHandler = handler
}

//GetServer returns server
// GetServer returns server
func (worker *Worker) GetServer() *Server {
return worker.server
}

//
func (worker *Worker) PreConsumeHandler() bool {
if worker.preConsumeHandler == nil {
return true
Expand Down

0 comments on commit 504fbae

Please sign in to comment.