Skip to content

Commit

Permalink
Allow customizing retry delays (introduce retry policies).
Browse files Browse the repository at this point in the history
  • Loading branch information
bojanz committed Apr 26, 2024
1 parent 3a56ec1 commit e2643f6
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 21 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ Finally, initialize the processor:
// logger is assumed to be a zerolog instance.
processor := nanoq.NewProcessor(nanoq.NewClient(db), logger)

// The default retry policy uses an exponential backoff with jitter,
// but callers can provide their own if necessary.
processor.RetryPolicy(func (t nanoq.Task) {
// First retry in 5s, every next retry in 1h.
if t.Retries == 0 {
return 5 * time.Second
}
return 1 * time.Hour
})
processor.OnError(func(ctx context.Context, t nanoq.Task, err error) {
// Log each failed task.
// Idea: Send to Sentry when t.Retries == t.MaxRetries.
Expand Down
51 changes: 30 additions & 21 deletions nanoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ func (c *Client) ClaimTask(ctx context.Context, tx *sqlx.Tx) (Task, error) {
return t, nil
}

// UpdateTask updates the given task.
func (c *Client) UpdateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
// RetryTask schedules a retry of the given task.
func (c *Client) RetryTask(ctx context.Context, tx *sqlx.Tx, t Task, retryIn time.Duration) error {
t.Retries++
t.ScheduledAt = time.Now().UTC().Add(retryIn)

_, err := tx.NamedExecContext(ctx, `UPDATE tasks SET retries = :retries, scheduled_at = :scheduled_at WHERE id = :id`, t)
if err != nil {
return err
Expand Down Expand Up @@ -247,8 +250,20 @@ type (

// Middleware wrap a handler in order to run logic before/after it.
Middleware func(next Handler) Handler

// RetryPolicy determines the retry delay for a given task.
RetryPolicy func(t Task) time.Duration
)

// DefaultRetryPolicy uses an exponential base delay with jitter.
// Approximate examples: 7s, 50s, 5min, 20min, 50min, 2h, 4h, 9h, 16h, 27h.
func DefaultRetryPolicy(t Task) time.Duration {
exp := 5 + int(math.Pow(float64(t.Retries+1), float64(5)))
s := exp + rand.IntN(exp/2)

return time.Duration(s) * time.Second
}

// Processor represents the queue processor.
type Processor struct {
client *Client
Expand All @@ -257,17 +272,19 @@ type Processor struct {
errorHandler ErrorHandler
handlers map[string]Handler
middleware []Middleware
retryPolicy RetryPolicy

done atomic.Bool
}

// NewProcessor creates a new processor.
func NewProcessor(client *Client, logger zerolog.Logger) *Processor {
return &Processor{
client: client,
logger: logger,
handlers: make(map[string]Handler),
middleware: make([]Middleware, 0),
client: client,
logger: logger,
handlers: make(map[string]Handler),
middleware: make([]Middleware, 0),
retryPolicy: DefaultRetryPolicy,
}
}

Expand All @@ -290,6 +307,11 @@ func (p *Processor) Handle(taskType string, h Handler, ms ...Middleware) {
p.handlers[taskType] = h
}

// RetryPolicy registers the given retry policy.
func (p *Processor) RetryPolicy(rp RetryPolicy) {
p.retryPolicy = rp
}

// Run starts the processor and blocks until a shutdown signal (SIGINT/SIGTERM) is received.
//
// Once the shutdown signal is received, workers stop claiming new tasks.
Expand Down Expand Up @@ -369,10 +391,8 @@ func (p *Processor) process(ctx context.Context) error {
p.errorHandler(ctx, t, err)
}
if t.Retries < t.MaxRetries && !errors.Is(err, ErrSkipRetry) {
t.Retries = t.Retries + 1
t.ScheduledAt = getNextRetryTime(int(t.Retries))

if err := p.client.UpdateTask(ctx, tx, t); err != nil {
retryIn := p.retryPolicy(t)
if err := p.client.RetryTask(ctx, tx, t, retryIn); err != nil {
return fmt.Errorf("update task %v: %w", t.ID, err)
}

Expand Down Expand Up @@ -411,14 +431,3 @@ func callHandler(ctx context.Context, h Handler, t Task) (err error) {

return h(ctx, t)
}

// getNextRetryTime returns the time of the next retry.
// Uses an exponential base delay with jitter.
// Approximate examples: 7s, 50s, 5min, 20min, 50min, 2h, 4h, 9h, 16h, 27h.
func getNextRetryTime(n int) time.Time {
exp := 5 + int(math.Pow(float64(n), float64(5)))
s := exp + rand.IntN(exp/2)
d := time.Duration(s) * time.Second

return time.Now().UTC().Add(d)
}
9 changes: 9 additions & 0 deletions nanoq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func TestProcessor_Run(t *testing.T) {
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
retryPolicyCalled := 0
processor.RetryPolicy(func(t nanoq.Task) time.Duration {
retryPolicyCalled++
return 1 * time.Second
})
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
// Fail the task once.
if task.Retries == 0 {
Expand Down Expand Up @@ -199,6 +204,10 @@ func TestProcessor_Run(t *testing.T) {
if errorHandlerCalled != 1 {
t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1)
}

if retryPolicyCalled != 1 {
t.Errorf("retry policy called %v times instead of %v", retryPolicyCalled, 1)
}
}

func TestProcessor_Run_RetriesExhausted(t *testing.T) {
Expand Down

0 comments on commit e2643f6

Please sign in to comment.