Skip to content

Commit

Permalink
Allow extending SQS message visibility (#10)
Browse files Browse the repository at this point in the history
* Behavior varies for non-SQS implementations, and may not be totally
correct (fine for us).
* Context object used to plumb through the function.
  • Loading branch information
cameron-dunn-sublime authored Apr 25, 2022
1 parent 612689a commit 2b032e4
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 12 deletions.
9 changes: 7 additions & 2 deletions v1/brokers/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
return fmt.Errorf("Failed delivery of delivery tag: %v", confirmed.DeliveryTag)
}

func (b *Broker) extend(time.Duration, *tasks.Signature) error {
// Unclear if extending is relevant to AMQP.
return fmt.Errorf("AMQP not support extending ack deadline")
}

// consume takes delivered messages from the channel and manages a worker pool
// to process tasks concurrently
func (b *Broker) consume(deliveries <-chan amqp.Delivery, concurrency int, taskProcessor iface.TaskProcessor, amqpCloseChan <-chan *amqp.Error) error {
Expand Down Expand Up @@ -333,7 +338,7 @@ func (b *Broker) consumeOne(delivery amqp.Delivery, taskProcessor iface.TaskProc

log.DEBUG.Printf("Received new message: %s", delivery.Body)

err := taskProcessor.Process(signature)
err := taskProcessor.Process(signature, b.extend)
if ack {
delivery.Ack(multiple)
}
Expand Down Expand Up @@ -439,7 +444,7 @@ type sigDumper struct {
Signatures []*tasks.Signature
}

func (s *sigDumper) Process(sig *tasks.Signature) error {
func (s *sigDumper) Process(sig *tasks.Signature, _ tasks.ExtendForSignatureFunc) error {
s.Signatures = append(s.Signatures, sig)
return nil
}
Expand Down
8 changes: 7 additions & 1 deletion v1/brokers/eager/eager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/RichardKnop/machinery/v1/brokers/iface"
"github.com/RichardKnop/machinery/v1/common"
Expand Down Expand Up @@ -59,7 +60,12 @@ func (eagerBroker *Broker) Publish(ctx context.Context, task *tasks.Signature) e
}

// blocking call to the task directly
return eagerBroker.worker.Process(signature)
return eagerBroker.worker.Process(signature, eagerBroker.extend)
}

func (b *Broker) extend(time.Duration, *tasks.Signature) error {
// Unclear what this behavior should be -- consuming isn't supported.
return fmt.Errorf("eager broker does not support extending")
}

// AssignWorker assigns a worker to the eager broker
Expand Down
8 changes: 7 additions & 1 deletion v1/brokers/gcppubsub/gcp_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
return nil
}

func (b *Broker) extend(time.Duration, *tasks.Signature) error {
// It appears GCP PubSub does behave like SQS with an "ack timeout". However, the signature might
// not contain the necessary ID to extend it, there just isn't motivation to implement at the moment.
return fmt.Errorf("GCP Pub Sub does not support extending ack deadline")
}

// consumeOne processes a single message using TaskProcessor
func (b *Broker) consumeOne(delivery *pubsub.Message, taskProcessor iface.TaskProcessor) {
if len(delivery.Data) == 0 {
Expand All @@ -185,7 +191,7 @@ func (b *Broker) consumeOne(delivery *pubsub.Message, taskProcessor iface.TaskPr
log.ERROR.Printf("task %s is not registered", sig.Name)
}

err := taskProcessor.Process(sig)
err := taskProcessor.Process(sig, b.extend)
if err != nil {
delivery.Nack()
log.ERROR.Printf("Failed process of task", err)
Expand Down
2 changes: 1 addition & 1 deletion v1/brokers/iface/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Broker interface {
// TaskProcessor - can process a delivered task
// This will probably always be a worker instance
type TaskProcessor interface {
Process(signature *tasks.Signature) error
Process(signature *tasks.Signature, extendFunc tasks.ExtendForSignatureFunc) error
CustomQueue() string
PreConsumeHandler() bool
}
8 changes: 7 additions & 1 deletion v1/brokers/redis/goredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ func (b *BrokerGR) Publish(ctx context.Context, signature *tasks.Signature) erro
return err
}

func (b *BrokerGR) extend(time.Duration, *tasks.Signature) error {
// Tasks don't need to be extended with redis because messages won't become available again if they're not
// acked/deleted.
return nil
}

// GetPendingTasks returns a slice of task signatures waiting in the queue
func (b *BrokerGR) GetPendingTasks(queue string) ([]*tasks.Signature, error) {

Expand Down Expand Up @@ -314,7 +320,7 @@ func (b *BrokerGR) consumeOne(delivery []byte, taskProcessor iface.TaskProcessor

log.DEBUG.Printf("Received new message: %s", delivery)

return taskProcessor.Process(signature)
return taskProcessor.Process(signature, b.extend)
}

// nextTask pops next available task from the default queue
Expand Down
8 changes: 7 additions & 1 deletion v1/brokers/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
return err
}

func (b *Broker) extend(time.Duration, *tasks.Signature) error {
// Tasks don't need to be extended with redis because messages won't become available again if they're not
// acked/deleted.
return nil
}

// GetPendingTasks returns a slice of task signatures waiting in the queue
func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
conn := b.open()
Expand Down Expand Up @@ -346,7 +352,7 @@ func (b *Broker) consumeOne(delivery []byte, taskProcessor iface.TaskProcessor)

log.DEBUG.Printf("Received new message: %s", delivery)

return taskProcessor.Process(signature)
return taskProcessor.Process(signature, b.extend)
}

// nextTask pops next available task from the default queue
Expand Down
15 changes: 14 additions & 1 deletion v1/brokers/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,19 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error

}

func (b *Broker) extend(by time.Duration, signature *tasks.Signature) error {
b.AdjustRoutingKey(signature)

visibilityInput := &awssqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(b.GetConfig().Broker + "/" + signature.RoutingKey),
ReceiptHandle: &signature.SQSReceiptHandle,
VisibilityTimeout: aws.Int64(int64(by.Seconds())),
}

_, err := b.service.ChangeMessageVisibility(visibilityInput)
return err
}

// consume is a method which keeps consuming deliveries from a channel, until there is an error or a stop signal
func (b *Broker) consume(deliveries <-chan *awssqs.ReceiveMessageOutput, concurrency int, taskProcessor iface.TaskProcessor, pool chan struct{}) error {

Expand Down Expand Up @@ -237,7 +250,7 @@ func (b *Broker) consumeOne(delivery *awssqs.ReceiveMessageOutput, taskProcessor
return fmt.Errorf("task %s is not registered", sig.Name)
}

err := taskProcessor.Process(sig)
err := taskProcessor.Process(sig, b.extend)
if err != nil {
// stop task deletion in case we want to send messages to dlq in sqs
if err == errs.ErrStopTaskDeletion {
Expand Down
29 changes: 28 additions & 1 deletion v1/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ type Task struct {
Args []reflect.Value
}

type ExtendForSignatureFunc func(by time.Duration, signature *Signature) error

type ExtendFunc func(time.Duration) error

type signatureCtxType struct{}
type extendFuncCtxType struct{}

var signatureCtx signatureCtxType
var extendFuncCtx extendFuncCtxType

// SignatureFromContext gets the signature from the context
func SignatureFromContext(ctx context.Context) *Signature {
Expand All @@ -46,11 +52,32 @@ func SignatureFromContext(ctx context.Context) *Signature {
return signature
}

// ExtendFuncFromContext gets the extend function from the context
func ExtendFuncFromContext(ctx context.Context) ExtendFunc {
if ctx == nil {
return nil
}

v := ctx.Value(extendFuncCtx)
if v == nil {
return nil
}

ef, _ := v.(ExtendFunc)
return ef
}

// NewWithSignature is the same as New but injects the signature
func NewWithSignature(taskFunc interface{}, signature *Signature) (*Task, error) {
func NewWithSignature(taskFunc interface{}, signature *Signature, extendFunc ExtendForSignatureFunc) (*Task, error) {
args := signature.Args
ctx := context.Background()
ctx = context.WithValue(ctx, signatureCtx, signature)

var ef ExtendFunc = func(by time.Duration) error {
return extendFunc(by, signature)
}
ctx = context.WithValue(ctx, extendFuncCtx, ef)

task := &Task{
TaskFunc: reflect.ValueOf(taskFunc),
Context: ctx,
Expand Down
2 changes: 1 addition & 1 deletion v1/tasks/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestTaskCallWithSignatureInContext(t *testing.T) {
}
signature, err := tasks.NewSignature("foo", []tasks.Arg{})
assert.NoError(t, err)
task, err := tasks.NewWithSignature(f, signature)
task, err := tasks.NewWithSignature(f, signature, func(time.Duration, *tasks.Signature) error { return nil })
assert.NoError(t, err)
taskResults, err := task.Call()
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions v1/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (worker *Worker) Quit() {
}

// Process handles received tasks and triggers success/error callbacks
func (worker *Worker) Process(signature *tasks.Signature) error {
func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.ExtendForSignatureFunc) error {
// If the task is not registered with this worker, do not continue
// but only return nil as we do not want to restart the worker process
if !worker.server.IsTaskRegistered(signature.Name) {
Expand All @@ -151,7 +151,7 @@ func (worker *Worker) Process(signature *tasks.Signature) error {
}

// Prepare task for processing
task, err := tasks.NewWithSignature(taskFunc, signature)
task, err := tasks.NewWithSignature(taskFunc, signature, extendFunc)
// if this failed, it means the task is malformed, probably has invalid
// signature, go directly to task failed without checking whether to retry
if err != nil {
Expand Down

0 comments on commit 2b032e4

Please sign in to comment.