diff --git a/nanoq.go b/nanoq.go index 4ba5a4a..3e65a60 100644 --- a/nanoq.go +++ b/nanoq.go @@ -350,9 +350,9 @@ func (p *Processor) process(ctx context.Context) error { if err := p.client.UpdateTask(ctx, tx, t); err != nil { return fmt.Errorf("update task %v: %w", t.ID, err) } - } - return nil + return nil + } } if err := p.client.DeleteTask(ctx, tx, t); err != nil { diff --git a/nanoq_test.go b/nanoq_test.go index a0e29e5..cf9fc43 100644 --- a/nanoq_test.go +++ b/nanoq_test.go @@ -3,6 +3,7 @@ package nanoq_test import ( "context" "errors" + "fmt" "slices" "testing" "time" @@ -184,3 +185,92 @@ func TestProcessor_Run(t *testing.T) { t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1) } } + +func TestProcessor_Run_RetriesExhausted(t *testing.T) { + db, mock, _ := sqlmock.New() + defer db.Close() + client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) + processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error { + return errors.New("temporary error") + }) + errorHandlerCalled := 0 + processor.OnError(func(ctx context.Context, task nanoq.Task, err error) { + errorHandlerCalled++ + }) + + // First task claim and retry. + mock.ExpectBegin() + rows := sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}). + AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "0", "1", time.Now(), time.Now()) + mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows) + + mock.ExpectExec("UPDATE tasks SET retries = (.+), scheduled_at = (.+) WHERE id = (.+)").WithArgs(1, sqlmock.AnyArg(), "01HQJHTZCAT5WDCGVTWJ640VMM"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + + // Second task claim and deletion (due to exhausted retries). + mock.ExpectBegin() + rows = sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}). + AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "1", "1", time.Now(), time.Now()) + mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows) + + mock.ExpectExec("DELETE FROM tasks WHERE id = (.+)").WithArgs("01HQJHTZCAT5WDCGVTWJ640VMM"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + + ctx, cancel := context.WithCancel(context.Background()) + go processor.Run(ctx, 1, 1*time.Second) + time.Sleep(1 * time.Second) + cancel() + + err := mock.ExpectationsWereMet() + if err != nil { + t.Error(err) + } + + if errorHandlerCalled != 2 { + t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 2) + } +} + +func TestProcessor_Run_SkipRetry(t *testing.T) { + db, mock, _ := sqlmock.New() + defer db.Close() + client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock")) + processor := nanoq.NewProcessor(client, zerolog.Nop()) + processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error { + return fmt.Errorf("something terrible happened: %w", nanoq.ErrSkipRetry) + }) + errorHandlerCalled := 0 + processor.OnError(func(ctx context.Context, task nanoq.Task, err error) { + if !errors.Is(err, nanoq.ErrSkipRetry) { + t.Errorf("error handler called with unexpected error: %v", err) + } + errorHandlerCalled++ + }) + + // Task claim and deletion. + mock.ExpectBegin() + rows := sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}). + AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "0", "1", time.Now(), time.Now()) + mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows) + + mock.ExpectExec("DELETE FROM tasks WHERE id = (.+)").WithArgs("01HQJHTZCAT5WDCGVTWJ640VMM"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + + ctx, cancel := context.WithCancel(context.Background()) + go processor.Run(ctx, 1, 1*time.Second) + time.Sleep(1 * time.Second) + cancel() + + err := mock.ExpectationsWereMet() + if err != nil { + t.Error(err) + } + + if errorHandlerCalled != 1 { + t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1) + } +}