Skip to content

Commit

Permalink
Convert handler panics into non-retryable errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
bojanz committed Apr 8, 2024
1 parent bbae57e commit 4b3746d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
26 changes: 25 additions & 1 deletion nanoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"math/rand/v2"
"os"
"os/signal"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -335,7 +337,7 @@ func (p *Processor) process(ctx context.Context) error {
h = p.middleware[i](h)
}

if err = h(ctx, t); err != nil {
if err = callHandler(ctx, h, t); err != nil {
if errors.Is(err, context.Canceled) {
return fmt.Errorf("task %v canceled: %v", t.ID, context.Cause(ctx))
}
Expand Down Expand Up @@ -363,6 +365,28 @@ func (p *Processor) process(ctx context.Context) error {
})
}

// callHandler calls the given handler, converting panics into errors.
func callHandler(ctx context.Context, h Handler, t Task) (err error) {
defer func() {
if r := recover(); r != nil {
var file string
var line int
// Skip the first two frames (callHandler, panic).
// If the panic came from the runtime, find the first application frame.
for i := 2; i < 10; i++ {
_, file, line, _ = runtime.Caller(i)
if !strings.HasPrefix(file, "runtime/") {
break
}
}

err = fmt.Errorf("panic [%s:%d]: %v: %w", file, line, r, ErrSkipRetry)
}
}()

return h(ctx, t)
}

// getNextRetryTime returns the time of the next retry.
// Uses an exponential base delay with jitter.
// Approximate examples: 7s, 50s, 5min, 20min, 50min, 2h, 4h, 9h, 16h, 27h.
Expand Down
44 changes: 43 additions & 1 deletion nanoq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"slices"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -244,7 +245,48 @@ func TestProcessor_Run_SkipRetry(t *testing.T) {
})
errorHandlerCalled := 0
processor.OnError(func(ctx context.Context, task nanoq.Task, err error) {
if !errors.Is(err, nanoq.ErrSkipRetry) {
if !errors.Is(err, nanoq.ErrSkipRetry) && !strings.Contains("something terrible happened", err.Error()) {
t.Errorf("error handler called with unexpected error: %v", err)
}
errorHandlerCalled++
})

// Task claim and deletion.
mock.ExpectBegin()
rows := sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}).
AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "0", "1", time.Now(), time.Now())
mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows)

mock.ExpectExec("DELETE FROM tasks WHERE id = (.+)").WithArgs("01HQJHTZCAT5WDCGVTWJ640VMM").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()

ctx, cancel := context.WithCancel(context.Background())
go processor.Run(ctx, 1, 1*time.Second)
time.Sleep(1 * time.Second)
cancel()

err := mock.ExpectationsWereMet()
if err != nil {
t.Error(err)
}

if errorHandlerCalled != 1 {
t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1)
}
}

func TestProcessor_Run_Panic(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
processor := nanoq.NewProcessor(client, zerolog.Nop())
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
panic(errors.New("oh no"))
})
errorHandlerCalled := 0
processor.OnError(func(ctx context.Context, task nanoq.Task, err error) {
if !errors.Is(err, nanoq.ErrSkipRetry) && !strings.Contains("oh no", err.Error()) {
t.Errorf("error handler called with unexpected error: %v", err)
}
errorHandlerCalled++
Expand Down

0 comments on commit 4b3746d

Please sign in to comment.