Skip to content

Commit

Permalink
Replace ETA with delay (#28)
Browse files Browse the repository at this point in the history
Using delays, converting to an ETA, and then back to a duration
effectively floors the delay.
  • Loading branch information
cameron-dunn-sublime authored Jun 7, 2024
1 parent 6a276b9 commit ec43948
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 113 deletions.
12 changes: 6 additions & 6 deletions integration-tests/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ func testPanic(server Server, t *testing.T) {
}

func testDelay(server Server, t *testing.T) {
now := time.Now().UTC()
eta := now.Add(100 * time.Millisecond)
task := newDelayTask(eta)
delay := 100 * time.Millisecond
eta := time.Now().UTC().Add(delay)
task := newDelayTask(delay)
asyncResult, err := server.SendTask(task)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -480,9 +480,9 @@ func newMultipleReturnTask(arg1, arg2 string, fail bool) *tasks.Signature {
}
}

func newDelayTask(eta time.Time) *tasks.Signature {
func newDelayTask(delay time.Duration) *tasks.Signature {
return &tasks.Signature{
Name: "delay_test",
ETA: &eta,
Name: "delay_test",
Delay: delay,
}
}
12 changes: 3 additions & 9 deletions v1/brokers/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,10 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
return fmt.Errorf("JSON marshal error: %s", err)
}

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// delay the task
if signature.ETA != nil {
now := time.Now().UTC()

if signature.ETA.After(now) {
delayMs := int64(signature.ETA.Sub(now) / time.Millisecond)

return b.delay(signature, delayMs)
}
if signature.Delay > 0 {
return b.delay(signature, signature.Delay.Milliseconds())
}

queue := b.GetConfig().DefaultQueue
Expand Down
22 changes: 8 additions & 14 deletions v1/brokers/azure/storage_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,15 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
messageBody := string(msg)
enqueueOptions := &azqueue.EnqueueMessageOptions{}

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// and is not a fifo queue, set a delay in seconds for the task.
if signature.ETA != nil {
now := time.Now().UTC()
delay := signature.ETA.Sub(now)
if delay > 0 {
if delay > maxDelay {
log.ERROR.Printf("max visibility timeout exceeded sending %s. defaulting to max.", signature.Name)
delay = maxDelay
}
delaysS := int32(delay.Seconds())
enqueueOptions.VisibilityTimeout = &delaysS
if signature.Delay > 0 {
if signature.Delay > maxDelay {
log.ERROR.Printf("max visibility timeout exceeded sending %s. defaulting to max.", signature.Name)
signature.Delay = maxDelay
}
delaysS := int32(signature.Delay.Seconds())
enqueueOptions.VisibilityTimeout = &delaysS
}

ttlSeconds := int32(b.cfg.TTL.Seconds())
Expand Down Expand Up @@ -173,9 +169,7 @@ func (b *Broker) extend(by time.Duration, signature *tasks.Signature) error {
func (b *Broker) RetryMessage(signature *tasks.Signature) {
b.AdjustRoutingKey(signature)

delay := signature.ETA.Sub(time.Now().UTC())

delayS := int32(delay.Seconds())
delayS := int32(signature.Delay.Seconds())

_, err := b.cfg.Client.NewQueueClient(signature.RoutingKey).UpdateMessage(
context.Background(),
Expand Down
14 changes: 5 additions & 9 deletions v1/brokers/redis/goredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,12 @@ func (b *BrokerGR) Publish(ctx context.Context, signature *tasks.Signature) erro
return fmt.Errorf("JSON marshal error: %s", err)
}

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// delay the task
if signature.ETA != nil {
now := time.Now().UTC()

if signature.ETA.After(now) {
score := signature.ETA.UnixNano()
err = b.rclient.ZAdd(context.Background(), b.redisDelayedTasksKey, redis.Z{Score: float64(score), Member: msg}).Err()
return err
}
if signature.Delay > 0 {
score := time.Now().Add(signature.Delay).UnixNano()
err = b.rclient.ZAdd(context.Background(), b.redisDelayedTasksKey, redis.Z{Score: float64(score), Member: msg}).Err()
return err
}

err = b.rclient.RPush(context.Background(), signature.RoutingKey, msg).Err()
Expand Down
14 changes: 5 additions & 9 deletions v1/brokers/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,12 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
conn := b.open()
defer conn.Close()

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// delay the task
if signature.ETA != nil {
now := time.Now().UTC()

if signature.ETA.After(now) {
score := signature.ETA.UnixNano()
_, err = conn.Do("ZADD", b.redisDelayedTasksKey, score, msg)
return err
}
if signature.Delay > 0 {
score := time.Now().Add(signature.Delay).UnixNano()
_, err = conn.Do("ZADD", b.redisDelayedTasksKey, score, msg)
return err
}

_, err = conn.Do("RPUSH", signature.RoutingKey, msg)
Expand Down
21 changes: 8 additions & 13 deletions v1/brokers/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,14 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
MsgInput.MessageGroupId = aws.String(MsgGroupID)
}

// Check the ETA signature field, if it is set and it is in the future,
// Check the delay signature field, if it is set and it is in the future,
// and is not a fifo queue, set a delay in seconds for the task.
if signature.ETA != nil && !strings.HasSuffix(signature.RoutingKey, ".fifo") {
now := time.Now().UTC()
delay := signature.ETA.Sub(now)
if delay > 0 {
if delay > maxAWSSQSDelay {
log.ERROR.Printf("max AWS SQS delay exceeded sending %s. defaulting to max.", signature.Name)
delay = maxAWSSQSDelay
}
MsgInput.DelaySeconds = aws.Int64(int64(delay.Seconds()))
if signature.Delay > 0 && !strings.HasSuffix(signature.RoutingKey, ".fifo") {
if signature.Delay > maxAWSSQSDelay {
log.ERROR.Printf("max AWS SQS delay exceeded sending %s. defaulting to max.", signature.Name)
signature.Delay = maxAWSSQSDelay
}
MsgInput.DelaySeconds = aws.Int64(int64(signature.Delay.Seconds()))
}

result, err := b.service.SendMessageWithContext(ctx, MsgInput)
Expand Down Expand Up @@ -199,13 +195,12 @@ func (b *Broker) extend(by time.Duration, signature *tasks.Signature) error {
func (b *Broker) RetryMessage(signature *tasks.Signature) {
b.AdjustRoutingKey(signature)

delay := signature.ETA.Sub(time.Now().UTC())
delay = restrictVisibilityTimeoutDelay(delay, signature.ReceivedAt)
signature.Delay = restrictVisibilityTimeoutDelay(signature.Delay, signature.ReceivedAt)

visibilityInput := &awssqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(b.GetConfig().Broker + "/" + signature.RoutingKey),
ReceiptHandle: &signature.SQSReceiptHandle,
VisibilityTimeout: aws.Int64(int64(delay.Seconds())),
VisibilityTimeout: aws.Int64(int64(signature.Delay.Seconds())),
}

_, err := b.service.ChangeMessageVisibility(visibilityInput)
Expand Down
4 changes: 2 additions & 2 deletions v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.
signature.Headers = tracing.HeadersWithSpan(signature.Headers, span)
}

if signature.ETA != nil {
span.SetTag("signature.eta", signature.ETA.String())
if signature.Delay > 0 {
span.SetTag("signature.eta", signature.Delay.String())
}

// Make sure result backend is defined
Expand Down
25 changes: 2 additions & 23 deletions v1/tasks/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,8 @@ import (
"time"
)

// ErrRetryTaskLater ...
type ErrRetryTaskLater struct {
name, msg string
retryIn time.Duration
}

// RetryIn returns time.Duration from now when task should be retried
func (e ErrRetryTaskLater) RetryIn() time.Duration {
return e.retryIn
}

// Error implements the error interface
func (e ErrRetryTaskLater) Error() string {
return fmt.Sprintf("Task error: %s Will retry in: %s", e.msg, e.retryIn)
}

// NewErrRetryTaskLater returns new ErrRetryTaskLater instance
func NewErrRetryTaskLater(msg string, retryIn time.Duration) ErrRetryTaskLater {
return ErrRetryTaskLater{msg: msg, retryIn: retryIn}
}

// Retriable is interface that retriable errors should implement
type Retriable interface {
// Retryable is interface that retryable errors should implement
type Retryable interface {
RetryIn() time.Duration
error
}
Expand Down
2 changes: 1 addition & 1 deletion v1/tasks/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Signature struct {
UUID string
Name string
RoutingKey string
ETA *time.Time
Delay time.Duration
IngestionTime *time.Time
FirstReceived *time.Time
ReceivedAt time.Time
Expand Down
12 changes: 6 additions & 6 deletions v1/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func New(taskFunc interface{}, args []Arg) (*Task, error) {
// Call attempts to call the task with the supplied arguments.
//
// `err` is set in the return value in two cases:
// 1. The reflected function invocation panics (e.g. due to a mismatched
// argument list).
// 2. The task func itself returns a non-nil error.
// 1. The reflected function invocation panics (e.g. due to a mismatched
// argument list).
// 2. The task func itself returns a non-nil error.
func (t *Task) Call() (taskResults []*TaskResult, err error) {
// retrieve the span from the task's context and finish it as soon as this function returns
span := opentracing.SpanFromContext(t.Context)
Expand Down Expand Up @@ -202,16 +202,16 @@ func (t *Task) Call() (taskResults []*TaskResult, err error) {
return nil, ErrLastReturnValueMustBeError
}

_, isRetriable := asError.(Retriable)
_, isRetryable := asError.(Retryable)

if span != nil {
if !isRetriable {
if !isRetryable {
span.LogFields(opentracing_log.Error(asError))
} else {
span.SetTag("warning", asError)
}

span.SetTag("can_retry", isRetriable)
span.SetTag("can_retry", isRetryable)
span.SetTag("did_fail", true)
}

Expand Down
7 changes: 4 additions & 3 deletions v1/tasks/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasks_test
import (
"context"
"errors"
"fmt"
"math"
"testing"
"time"
Expand All @@ -17,17 +18,17 @@ func TestTaskCallErrorTest(t *testing.T) {
t.Parallel()

// Create test task that returns tasks.ErrRetryTaskLater error
retriable := func() error { return tasks.NewErrRetryTaskLater("some error", 4*time.Hour) }
retryable := func() error { return tasks.NewErrKeepAndRetryTaskLater(fmt.Errorf("some error"), "", 4*time.Hour) }

task, err := tasks.New(retriable, []tasks.Arg{})
task, err := tasks.New(retryable, []tasks.Arg{})
_, task.Context = opentracing.StartSpanFromContext(context.Background(), "test")
assert.NoError(t, err)

// Invoke TryCall and validate that returned error can be cast to tasks.ErrRetryTaskLater
results, err := task.Call()
assert.Nil(t, results)
assert.NotNil(t, err)
_, ok := interface{}(err).(tasks.ErrRetryTaskLater)
_, ok := interface{}(err).(tasks.ErrKeepAndRetryTaskLater)
assert.True(t, ok, "Error should be castable to tasks.ErrRetryTaskLater")

// Create test task that returns a standard error
Expand Down
23 changes: 5 additions & 18 deletions v1/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,6 @@ func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.Exten
// Call the task
results, err := task.Call()
if err != nil {
// If a tasks.ErrRetryTaskLater was returned from the task,
// retry the task after specified duration
retryTaskLaterErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
if ok {
if worker.taskRetryErrorHandler != nil {
worker.taskRetryErrorHandler(err, signature)
}
return worker.retryTaskIn(signature, retryTaskLaterErr.RetryIn())
}

keepAndRetryErr, ok := interface{}(err).(tasks.ErrKeepAndRetryTaskLater)
if ok {
if worker.taskRetryErrorHandler != nil {
Expand Down Expand Up @@ -266,8 +256,7 @@ func (worker *Worker) taskRetry(signature *tasks.Signature) error {
signature.RetryTimeout = retry.FibonacciNext(signature.RetryTimeout)

// Delay task by signature.RetryTimeout seconds
eta := time.Now().UTC().Add(time.Second * time.Duration(signature.RetryTimeout))
signature.ETA = &eta
signature.Delay = time.Second * time.Duration(signature.RetryTimeout)

log.INFO.Printf("Task %s failed. Going to retry in %d seconds.", signature.UUID, signature.RetryTimeout)

Expand All @@ -276,16 +265,15 @@ func (worker *Worker) taskRetry(signature *tasks.Signature) error {
return err
}

// taskRetryIn republishes the task to the queue with ETA of now + retryIn.Seconds()
// taskRetryIn republishes the task to the queue retryIn
func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Duration) error {
// Update task state to RETRY
if err := worker.server.GetBackend().SetStateRetry(signature); err != nil {
return fmt.Errorf("Set state to 'retry' for task %s returned error: %s", signature.UUID, err)
}

// Delay task by retryIn duration
eta := time.Now().UTC().Add(retryIn)
signature.ETA = &eta
signature.Delay = retryIn

// Increase the attempt count, but leave RetryCount alone because it's for the default retry behavior.
signature.AttemptCount++
Expand All @@ -297,16 +285,15 @@ func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Durat
return err
}

// keepAndRetryTaskIn attempts to keep the message on the queue but with a new ETA of now + retryIn.Seconds()
// keepAndRetryTaskIn attempts to keep the message on the queue but with a delay of retryIn
func (worker *Worker) keepAndRetryTaskIn(signature *tasks.Signature, retryIn time.Duration) error {
// Update task state to RETRY
if err := worker.server.GetBackend().SetStateRetry(signature); err != nil {
return fmt.Errorf("Set state to 'retry' for task %s returned error: %w", signature.UUID, err)
}

// Delay task by retryIn duration
eta := time.Now().UTC().Add(retryIn)
signature.ETA = &eta
signature.Delay = retryIn

// Increase the attempt count, but leave RetryCount alone because it's for the default retry behavior. This will
// only matter if the broker does not support RetryMessage and falls back to sending a new task.
Expand Down

0 comments on commit ec43948

Please sign in to comment.