Skip to content

Commit

Permalink
Rework task processing.
Browse files Browse the repository at this point in the history
Old model:
- Each worker does its own polling for tasks.
- Each task is claimed in a transaction which is held open until the task is processed.

New model:
- The processor polls in one place, then passes claimed tasks to workers.
  This reduces db load when there are no tasks to claim.
- Each task is claimed in a transaction which is immediately committed.
  The task is then updated in a separate transaction once processing is complete.

Bonus benefit: if the processor crashes, stuck tasks are now recovered much quicker,
after the task timeout has passed, instead of waiting on MySQL's wait_timeout, which
is usually set to a high value by default, e.g. 28800s (8h).
  • Loading branch information
bojanz committed May 6, 2024
1 parent e2643f6 commit 445ed61
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 89 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# NanoQ

NanoQ is a MySQL-powered task queue, implemented in ~350 lines of code.
NanoQ is a MySQL-powered task queue, implemented in ~400 lines of code.

While it can be used as-is, you are encouraged to copy-paste it into your project and customize it according to your tastes and needs.

Expand Down
201 changes: 115 additions & 86 deletions nanoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"os/signal"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
Expand Down Expand Up @@ -40,15 +39,16 @@ var (

// Task represents a task.
type Task struct {
ID string `db:"id"`
Fingerprint string `db:"fingerprint"`
Type string `db:"type"`
Payload []byte `db:"payload"`
Retries uint8 `db:"retries"`
MaxRetries uint8 `db:"max_retries"`
TimeoutSeconds int32 `db:"timeout_seconds"`
CreatedAt time.Time `db:"created_at"`
ScheduledAt time.Time `db:"scheduled_at"`
ID string `db:"id"`
Fingerprint string `db:"fingerprint"`
Type string `db:"type"`
Payload []byte `db:"payload"`
Retries uint8 `db:"retries"`
MaxRetries uint8 `db:"max_retries"`
TimeoutSeconds int32 `db:"timeout_seconds"`
CreatedAt time.Time `db:"created_at"`
ScheduledAt time.Time `db:"scheduled_at"`
ClaimedAt *time.Time `db:"claimed_at"`
}

// NewTask creates a new task.
Expand Down Expand Up @@ -147,6 +147,7 @@ func NewClient(db *sqlx.DB) *Client {

// CreateTask creates the given task.
//
// Expected to run in an existing transaction.
// Returns ErrDuplicateTask if a task with the same fingerprint already exists.
func (c *Client) CreateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `
Expand All @@ -166,41 +167,64 @@ func (c *Client) CreateTask(ctx context.Context, tx *sqlx.Tx, t Task) error {

// ClaimTask claims a task for processing.
//
// The claim is valid until the transaction is committed or rolled back.
func (c *Client) ClaimTask(ctx context.Context, tx *sqlx.Tx) (Task, error) {
// Returns ErrNoTasks if no tasks are available.
func (c *Client) ClaimTask(ctx context.Context) (Task, error) {
t := Task{}
err := tx.GetContext(ctx, &t, `SELECT * FROM tasks WHERE scheduled_at <= UTC_TIMESTAMP() ORDER BY scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED`)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return t, ErrNoTasks
err := c.RunTransaction(ctx, func(tx *sqlx.Tx) error {
// Tasks are expected to be canceled and released after the task timeout is reached.
// If a task is still claimed after that point, it likely means that the processor has crashed, requiring the task to be reclaimed.
// Task are reclaimed after timeout_seconds * 1.1, to allow for extra processing to occur post-cancelation.
err := tx.GetContext(ctx, &t, `
SELECT * FROM tasks
WHERE scheduled_at <= UTC_TIMESTAMP()
AND (claimed_at IS NULL OR DATE_ADD(claimed_at, INTERVAL timeout_seconds*1.1 SECOND) < UTC_TIMESTAMP())
ORDER BY scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED`)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNoTasks
}
return fmt.Errorf("get task: %w", err)
}
return t, err
}
now := time.Now().UTC()
t.ClaimedAt = &now

return t, nil
}

// RetryTask schedules a retry of the given task.
func (c *Client) RetryTask(ctx context.Context, tx *sqlx.Tx, t Task, retryIn time.Duration) error {
t.Retries++
t.ScheduledAt = time.Now().UTC().Add(retryIn)
_, err = tx.NamedExecContext(ctx, `UPDATE tasks SET claimed_at = :claimed_at WHERE id = :id`, t)
if err != nil {
return fmt.Errorf("update task: %w", err)
}

_, err := tx.NamedExecContext(ctx, `UPDATE tasks SET retries = :retries, scheduled_at = :scheduled_at WHERE id = :id`, t)
if err != nil {
return err
}
return nil
})

return nil
return t, err
}

// DeleteTask deletes the given task.
func (c *Client) DeleteTask(ctx context.Context, tx *sqlx.Tx, t Task) error {
_, err := tx.NamedExecContext(ctx, `DELETE FROM tasks WHERE id = :id`, t)
if err != nil {
func (c *Client) DeleteTask(ctx context.Context, t Task) error {
return c.RunTransaction(ctx, func(tx *sqlx.Tx) error {
_, err := tx.NamedExecContext(ctx, `DELETE FROM tasks WHERE id = :id`, t)
return err
}
})
}

return nil
// ReleaseTask releases the given task, allowing it to be claimed again.
func (c *Client) ReleaseTask(ctx context.Context, t Task) error {
return c.RunTransaction(ctx, func(tx *sqlx.Tx) error {
_, err := tx.NamedExecContext(ctx, `UPDATE tasks SET claimed_at = NULL WHERE id = :id`, t)
return err
})
}

// RetryTask schedules a retry of the given task.
func (c *Client) RetryTask(ctx context.Context, t Task, retryIn time.Duration) error {
return c.RunTransaction(ctx, func(tx *sqlx.Tx) error {
t.Retries++
t.ScheduledAt = time.Now().UTC().Add(retryIn)
t.ClaimedAt = nil

_, err := tx.NamedExecContext(ctx, `UPDATE tasks SET retries = :retries, scheduled_at = :scheduled_at, claimed_at = :claimed_at WHERE id = :id`, t)
return err
})
}

// RunTransaction runs the given function in a transaction.
Expand Down Expand Up @@ -274,7 +298,8 @@ type Processor struct {
middleware []Middleware
retryPolicy RetryPolicy

done atomic.Bool
workers chan struct{}
done atomic.Bool
}

// NewProcessor creates a new processor.
Expand Down Expand Up @@ -336,76 +361,80 @@ func (p *Processor) Run(ctx context.Context, concurrency int, shutdownTimeout ti
}()

p.logger.Info().Int("concurrency", concurrency).Msg("Starting processor")
var wg sync.WaitGroup
for range concurrency {
wg.Add(1)
p.workers = make(chan struct{}, concurrency)
for !p.done.Load() {
// Acquire a worker before claiming a task, to avoid holding claimed tasks while all workers are busy.
p.workers <- struct{}{}

t, err := p.client.ClaimTask(processorCtx)
if err != nil {
if !errors.Is(err, ErrNoTasks) && !errors.Is(err, context.Canceled) {
p.logger.Error().Err(err).Msg("Could not claim task")
}
<-p.workers
time.Sleep(1 * time.Second)
continue
}

go func() {
for !p.done.Load() {
err := p.process(processorCtx)
if err != nil {
if errors.Is(err, ErrNoTasks) {
// The queue is empty. Wait a second before trying again.
time.Sleep(1 * time.Second)
continue
}
p.logger.Error().Err(err).Msg("Could not process task")
}
if err = p.processTask(processorCtx, t); err != nil {
p.logger.Error().Err(err).Msg("Could not process task")
}
wg.Done()
<-p.workers
}()
}

wg.Wait()
// Wait for workers to finish.
for range cap(p.workers) {
p.workers <- struct{}{}
}
}

// process claims a single task and processes it.
func (p *Processor) process(ctx context.Context) error {
return p.client.RunTransaction(ctx, func(tx *sqlx.Tx) error {
t, err := p.client.ClaimTask(ctx, tx)
if err != nil {
return fmt.Errorf("claim task: %w", err)
// processTask processes a single task.
func (p *Processor) processTask(ctx context.Context, t Task) error {
h, ok := p.handlers[t.Type]
if !ok {
h = func(ctx context.Context, t Task) error {
return fmt.Errorf("no handler found for task type %v: %w", t.Type, ErrSkipRetry)
}
}
// Apply global middleware.
for i := len(p.middleware) - 1; i >= 0; i-- {
h = p.middleware[i](h)
}

h, ok := p.handlers[t.Type]
if !ok {
h = func(ctx context.Context, t Task) error {
return fmt.Errorf("no handler found for task type %v: %w", t.Type, ErrSkipRetry)
if err := callHandler(ctx, h, t); err != nil {
if errors.Is(err, context.Canceled) {
// The processor is shutting down. Release the task and exit.
if err = p.client.ReleaseTask(context.Background(), t); err != nil {
return fmt.Errorf("release task %v: %w", t.ID, err)
}
}
// Apply global middleware.
for i := len(p.middleware) - 1; i >= 0; i-- {
h = p.middleware[i](h)
return fmt.Errorf("task %v canceled: %v", t.ID, context.Cause(ctx))
}

if err = callHandler(ctx, h, t); err != nil {
if errors.Is(err, context.Canceled) {
return fmt.Errorf("task %v canceled: %v", t.ID, context.Cause(ctx))
}
if errors.Is(err, context.DeadlineExceeded) {
// Extract a more specific timeout error, if any.
if errors.Is(err, context.DeadlineExceeded) {
err = context.Cause(ctx)
}
err = context.Cause(ctx)
}
if p.errorHandler != nil {
p.errorHandler(ctx, t, err)
}

if p.errorHandler != nil {
p.errorHandler(ctx, t, err)
if t.Retries < t.MaxRetries && !errors.Is(err, ErrSkipRetry) {
retryIn := p.retryPolicy(t)
if err := p.client.RetryTask(ctx, t, retryIn); err != nil {
return fmt.Errorf("retry task %v: %w", t.ID, err)
}
if t.Retries < t.MaxRetries && !errors.Is(err, ErrSkipRetry) {
retryIn := p.retryPolicy(t)
if err := p.client.RetryTask(ctx, tx, t, retryIn); err != nil {
return fmt.Errorf("update task %v: %w", t.ID, err)
}

return nil
}
return nil
}
}

if err := p.client.DeleteTask(ctx, tx, t); err != nil {
return fmt.Errorf("delete task %v: %w", t.ID, err)
}
if err := p.client.DeleteTask(ctx, t); err != nil {
return fmt.Errorf("delete task %v: %w", t.ID, err)
}

return nil
})
return nil
}

// callHandler calls the given handler, converting panics into errors.
Expand Down
Loading

0 comments on commit 445ed61

Please sign in to comment.