Skip to content

Commit

Permalink
Fix bug where tasks weren't deleted after exhausted retries / SkipRetry.
Browse files Browse the repository at this point in the history
  • Loading branch information
bojanz committed Mar 21, 2024
1 parent a094680 commit e7d8c38
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
4 changes: 2 additions & 2 deletions nanoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ func (p *Processor) process(ctx context.Context) error {
if err := p.client.UpdateTask(ctx, tx, t); err != nil {
return fmt.Errorf("update task %v: %w", t.ID, err)
}
}

return nil
return nil
}
}

if err := p.client.DeleteTask(ctx, tx, t); err != nil {
Expand Down
90 changes: 90 additions & 0 deletions nanoq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nanoq_test
import (
"context"
"errors"
"fmt"
"slices"
"testing"
"time"
Expand Down Expand Up @@ -184,3 +185,92 @@ func TestProcessor_Run(t *testing.T) {
t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1)
}
}

func TestProcessor_Run_RetriesExhausted(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
return errors.New("temporary error")
})
errorHandlerCalled := 0
processor.OnError(func(ctx context.Context, task nanoq.Task, err error) {
errorHandlerCalled++
})

// First task claim and retry.
mock.ExpectBegin()
rows := sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}).
AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "0", "1", time.Now(), time.Now())
mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows)

mock.ExpectExec("UPDATE tasks SET retries = (.+), scheduled_at = (.+) WHERE id = (.+)").WithArgs(1, sqlmock.AnyArg(), "01HQJHTZCAT5WDCGVTWJ640VMM").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()

// Second task claim and deletion (due to exhausted retries).
mock.ExpectBegin()
rows = sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}).
AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "1", "1", time.Now(), time.Now())
mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows)

mock.ExpectExec("DELETE FROM tasks WHERE id = (.+)").WithArgs("01HQJHTZCAT5WDCGVTWJ640VMM").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()

ctx, cancel := context.WithCancel(context.Background())
go processor.Run(ctx, 1, 1*time.Second)
time.Sleep(1 * time.Second)
cancel()

err := mock.ExpectationsWereMet()
if err != nil {
t.Error(err)
}

if errorHandlerCalled != 2 {
t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 2)
}
}

func TestProcessor_Run_SkipRetry(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
return fmt.Errorf("something terrible happened: %w", nanoq.ErrSkipRetry)
})
errorHandlerCalled := 0
processor.OnError(func(ctx context.Context, task nanoq.Task, err error) {
if !errors.Is(err, nanoq.ErrSkipRetry) {
t.Errorf("error handler called with unexpected error: %v", err)
}
errorHandlerCalled++
})

// Task claim and deletion.
mock.ExpectBegin()
rows := sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}).
AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "0", "1", time.Now(), time.Now())
mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows)

mock.ExpectExec("DELETE FROM tasks WHERE id = (.+)").WithArgs("01HQJHTZCAT5WDCGVTWJ640VMM").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()

ctx, cancel := context.WithCancel(context.Background())
go processor.Run(ctx, 1, 1*time.Second)
time.Sleep(1 * time.Second)
cancel()

err := mock.ExpectationsWereMet()
if err != nil {
t.Error(err)
}

if errorHandlerCalled != 1 {
t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1)
}
}

0 comments on commit e7d8c38

Please sign in to comment.