Skip to content

Commit

Permalink
Rework task processing.
Browse files Browse the repository at this point in the history
Old model:
- Each worker does its own polling for tasks.
- Each task is claimed in a transaction which is held open until the task is processed.

New model:
- The processor polls in one place, then passes claimed tasks to workers.
  This reduces db load when there are no tasks to claim.
- Each task is claimed in a transaction which is immediately committed.
  The task is then updated in a separate transaction once processing is complete.

Bonus benefit: if the processor crashes, stuck tasks are now recovered much quicker,
after the task timeout has passed, instead of waiting on MySQL's wait_timeout, which
is usually set to a high value by default, e.g. 28800s (8h).
  • Loading branch information
bojanz committed Apr 23, 2024
1 parent 3a56ec1 commit 93b7eb8
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 98 deletions.
218 changes: 122 additions & 96 deletions nanoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"os/signal"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
Expand Down Expand Up @@ -40,15 +39,16 @@ var (

// Task represents a task.
type Task struct {
ID string `db:"id"`
Fingerprint string `db:"fingerprint"`
Type string `db:"type"`
Payload []byte `db:"payload"`
Retries uint8 `db:"retries"`
MaxRetries uint8 `db:"max_retries"`
TimeoutSeconds int32 `db:"timeout_seconds"`
CreatedAt time.Time `db:"created_at"`
ScheduledAt time.Time `db:"scheduled_at"`
ID string `db:"id"`
Fingerprint string `db:"fingerprint"`
Type string `db:"type"`
Payload []byte `db:"payload"`
Retries uint8 `db:"retries"`
MaxRetries uint8 `db:"max_retries"`
TimeoutSeconds int32 `db:"timeout_seconds"`
CreatedAt time.Time `db:"created_at"`
ScheduledAt time.Time `db:"scheduled_at"`
ClaimedAt *time.Time `db:"claimed_at"`
}

// NewTask creates a new task.
Expand Down Expand Up @@ -147,6 +147,7 @@ func NewClient(db *sqlx.DB) *Client {

// CreateTask creates the given task.
//
// Expected to run in an existing transaction.
// Returns ErrDuplicateTask if a task with the same fingerprint already exists.
func (c *Client) CreateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `
Expand All @@ -166,38 +167,61 @@ func (c *Client) CreateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {

// ClaimTask claims a task for processing.
//
// The claim is valid until the transaction is committed or rolled back.
func (c *Client) ClaimTask(ctx context.Context, tx *sqlx.Tx) (Task, error) {
// Returns ErrNoTasks if no tasks are available.
func (c *Client) ClaimTask(ctx context.Context) (Task, error) {
t := Task{}
err := tx.GetContext(ctx, &t, `SELECT * FROM tasks WHERE scheduled_at <= UTC_TIMESTAMP() ORDER BY scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED`)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return t, ErrNoTasks
err := c.RunTransaction(ctx, func(tx *sqlx.Tx) error {
err := tx.GetContext(ctx, &t, `
SELECT * FROM tasks
WHERE scheduled_at <= UTC_TIMESTAMP()
AND (claimed_at IS NULL OR DATE_ADD(claimed_at, INTERVAL timeout_seconds*1.1 SECOND) < UTC_TIMESTAMP())
ORDER BY scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED`)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNoTasks
}
return fmt.Errorf("get: %w", err)
}
return t, err
}
now := time.Now().UTC()
t.ClaimedAt = &now

return t, nil
}
_, err = tx.NamedExecContext(ctx, `UPDATE tasks SET claimed_at = :claimed_at WHERE id = :id`, t)
if err != nil {
return fmt.Errorf("update: %w", err)
}

// UpdateTask updates the given task.
func (c *Client) UpdateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `UPDATE tasks SET retries = :retries, scheduled_at = :scheduled_at WHERE id = :id`, t)
if err != nil {
return err
}
return nil
})

return nil
return t, err
}

// DeleteTask deletes the given task.
func (c *Client) DeleteTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `DELETE FROM tasks WHERE id = :id`, t)
if err != nil {
func (c *Client) DeleteTask(ctx context.Context, t Task) error {
return c.RunTransaction(ctx, func(tx *sqlx.Tx) error {
_, err := tx.NamedExecContext(ctx, `DELETE FROM tasks WHERE id = :id`, t)
return err
}
})
}

return nil
// ReleaseTask releases the given task, allowing it to be claimed again.
func (c *Client) ReleaseTask(ctx context.Context, t Task) error {
return c.RunTransaction(ctx, func(tx *sqlx.Tx) error {
_, err := tx.NamedExecContext(ctx, `UPDATE tasks SET claimed_at = NULL WHERE id = :id`, t)
return err
})
}

// RetryTask schedules a retry of the given task.
func (c *Client) RetryTask(ctx context.Context, t Task) error {
return c.RunTransaction(ctx, func(tx *sqlx.Tx) error {
t.Retries++
t.ScheduledAt = getNextRetryTime(int(t.Retries))
t.ClaimedAt = nil

_, err := tx.NamedExecContext(ctx, `UPDATE tasks SET retries = :retries, scheduled_at = :scheduled_at, claimed_at = :claimed_at WHERE id = :id`, t)
return err
})
}

// RunTransaction runs the given function in a transaction.
Expand All @@ -222,6 +246,17 @@ func (c *Client) RunTransaction(ctx context.Context, fn func(tx *sqlx.Tx) error)
return nil
}

// getNextRetryTime returns the time of the next retry.
// Uses an exponential base delay with jitter.
// Approximate examples: 7s, 50s, 5min, 20min, 50min, 2h, 4h, 9h, 16h, 27h.
func getNextRetryTime(n int) time.Time {
exp := 5 + int(math.Pow(float64(n), float64(5)))
s := exp + rand.IntN(exp/2)
d := time.Duration(s) * time.Second

return time.Now().UTC().Add(d)
}

// isConflictError checks whether the given error is a MySQL / MariaDB conflict error.
func isConflictError(err error) bool {
if sqlErr, ok := err.(*mysql.MySQLError); ok {
Expand Down Expand Up @@ -258,7 +293,8 @@ type Processor struct {
handlers map[string]Handler
middleware []Middleware

done atomic.Bool
workers chan struct{}
done atomic.Bool
}

// NewProcessor creates a new processor.
Expand Down Expand Up @@ -314,78 +350,79 @@ func (p *Processor) Run(ctx context.Context, concurrency int, shutdownTimeout ti
}()

p.logger.Info().Int("concurrency", concurrency).Msg("Starting processor")
var wg sync.WaitGroup
for range concurrency {
wg.Add(1)
p.workers = make(chan struct{}, concurrency)
for !p.done.Load() {
// Acquire a worker before claiming a task, to avoid holding claimed tasks while all workers are busy.
p.workers <- struct{}{}

t, err := p.client.ClaimTask(processorCtx)
if err != nil {
if !errors.Is(err, ErrNoTasks) && !errors.Is(err, context.Canceled) {
p.logger.Error().Err(err).Msg("Could not claim task")
}
<-p.workers
time.Sleep(1 * time.Second)
continue
}

go func() {
for !p.done.Load() {
err := p.process(processorCtx)
if err != nil {
if errors.Is(err, ErrNoTasks) {
// The queue is empty. Wait a second before trying again.
time.Sleep(1 * time.Second)
continue
}
p.logger.Error().Err(err).Msg("Could not process task")
}
if err = p.processTask(processorCtx, t); err != nil {
p.logger.Error().Err(err).Msg("Could not process task")
}
wg.Done()
<-p.workers
}()
}

wg.Wait()
// Wait for workers to finish.
for range cap(p.workers) {
p.workers <- struct{}{}
}
}

// process claims a single task and processes it.
func (p *Processor) process(ctx context.Context) error {
return p.client.RunTransaction(ctx, func(tx *sqlx.Tx) error {
t, err := p.client.ClaimTask(ctx, tx)
if err != nil {
return fmt.Errorf("claim task: %w", err)
// processTask processes a single task.
func (p *Processor) processTask(ctx context.Context, t Task) error {
h, ok := p.handlers[t.Type]
if !ok {
h = func(ctx context.Context, t Task) error {
return fmt.Errorf("no handler found for task type %v: %w", t.Type, ErrSkipRetry)
}
}
// Apply global middleware.
for i := len(p.middleware) - 1; i >= 0; i-- {
h = p.middleware[i](h)
}

h, ok := p.handlers[t.Type]
if !ok {
h = func(ctx context.Context, t Task) error {
return fmt.Errorf("no handler found for task type %v: %w", t.Type, ErrSkipRetry)
if err := callHandler(ctx, h, t); err != nil {
if errors.Is(err, context.Canceled) {
// The processor is shutting down. Release the task and exit.
if err = p.client.ReleaseTask(context.Background(), t); err != nil {
return fmt.Errorf("release task %v: %w", t.ID, err)
}
}
// Apply global middleware.
for i := len(p.middleware) - 1; i >= 0; i-- {
h = p.middleware[i](h)
return fmt.Errorf("task %v canceled: %v", t.ID, context.Cause(ctx))
}

if err = callHandler(ctx, h, t); err != nil {
if errors.Is(err, context.Canceled) {
return fmt.Errorf("task %v canceled: %v", t.ID, context.Cause(ctx))
}
if errors.Is(err, context.DeadlineExceeded) {
// Extract a more specific timeout error, if any.
if errors.Is(err, context.DeadlineExceeded) {
err = context.Cause(ctx)
}
err = context.Cause(ctx)
}
if p.errorHandler != nil {
p.errorHandler(ctx, t, err)
}

if p.errorHandler != nil {
p.errorHandler(ctx, t, err)
if t.Retries < t.MaxRetries && !errors.Is(err, ErrSkipRetry) {
if err := p.client.RetryTask(ctx, t); err != nil {
return fmt.Errorf("retry task %v: %w", t.ID, err)
}
if t.Retries < t.MaxRetries && !errors.Is(err, ErrSkipRetry) {
t.Retries = t.Retries + 1
t.ScheduledAt = getNextRetryTime(int(t.Retries))

if err := p.client.UpdateTask(ctx, tx, t); err != nil {
return fmt.Errorf("update task %v: %w", t.ID, err)
}

return nil
}
return nil
}
}

if err := p.client.DeleteTask(ctx, tx, t); err != nil {
return fmt.Errorf("delete task %v: %w", t.ID, err)
}
if err := p.client.DeleteTask(ctx, t); err != nil {
return fmt.Errorf("delete task %v: %w", t.ID, err)
}

return nil
})
return nil
}

// callHandler calls the given handler, converting panics into errors.
Expand All @@ -411,14 +448,3 @@ func callHandler(ctx context.Context, h Handler, t Task) (err error) {

return h(ctx, t)
}

// getNextRetryTime returns the time of the next retry.
// Uses an exponential base delay with jitter.
// Approximate examples: 7s, 50s, 5min, 20min, 50min, 2h, 4h, 9h, 16h, 27h.
func getNextRetryTime(n int) time.Time {
exp := 5 + int(math.Pow(float64(n), float64(5)))
s := exp + rand.IntN(exp/2)
d := time.Duration(s) * time.Second

return time.Now().UTC().Add(d)
}
Loading

0 comments on commit 93b7eb8

Please sign in to comment.