diff --git a/v1/brokers/amqp/amqp.go b/v1/brokers/amqp/amqp.go index 5edcb81c..37768f5b 100644 --- a/v1/brokers/amqp/amqp.go +++ b/v1/brokers/amqp/amqp.go @@ -249,6 +249,11 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error return fmt.Errorf("Failed delivery of delivery tag: %v", confirmed.DeliveryTag) } +func (b *Broker) extend(time.Duration, *tasks.Signature) error { + // Unclear if extending is relevant to AMQP. + return fmt.Errorf("AMQP not support extending ack deadline") +} + // consume takes delivered messages from the channel and manages a worker pool // to process tasks concurrently func (b *Broker) consume(deliveries <-chan amqp.Delivery, concurrency int, taskProcessor iface.TaskProcessor, amqpCloseChan <-chan *amqp.Error) error { @@ -333,7 +338,7 @@ func (b *Broker) consumeOne(delivery amqp.Delivery, taskProcessor iface.TaskProc log.DEBUG.Printf("Received new message: %s", delivery.Body) - err := taskProcessor.Process(signature) + err := taskProcessor.Process(signature, b.extend) if ack { delivery.Ack(multiple) } @@ -439,7 +444,7 @@ type sigDumper struct { Signatures []*tasks.Signature } -func (s *sigDumper) Process(sig *tasks.Signature) error { +func (s *sigDumper) Process(sig *tasks.Signature, _ tasks.ExtendForSignatureFunc) error { s.Signatures = append(s.Signatures, sig) return nil } diff --git a/v1/brokers/eager/eager.go b/v1/brokers/eager/eager.go index 3726b70d..9c0cb9f1 100644 --- a/v1/brokers/eager/eager.go +++ b/v1/brokers/eager/eager.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "time" "github.com/RichardKnop/machinery/v1/brokers/iface" "github.com/RichardKnop/machinery/v1/common" @@ -59,7 +60,12 @@ func (eagerBroker *Broker) Publish(ctx context.Context, task *tasks.Signature) e } // blocking call to the task directly - return eagerBroker.worker.Process(signature) + return eagerBroker.worker.Process(signature, eagerBroker.extend) +} + +func (b *Broker) extend(time.Duration, *tasks.Signature) error { + // Unclear what this behavior should be -- consuming isn't supported. + return fmt.Errorf("eager broker does not support extending") } // AssignWorker assigns a worker to the eager broker diff --git a/v1/brokers/gcppubsub/gcp_pubsub.go b/v1/brokers/gcppubsub/gcp_pubsub.go index 5e374a9b..bc479448 100644 --- a/v1/brokers/gcppubsub/gcp_pubsub.go +++ b/v1/brokers/gcppubsub/gcp_pubsub.go @@ -163,6 +163,12 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error return nil } +func (b *Broker) extend(time.Duration, *tasks.Signature) error { + // It appears GCP PubSub does behave like SQS with an "ack timeout". However, the signature might + // not contain the necessary ID to extend it, there just isn't motivation to implement at the moment. + return fmt.Errorf("GCP Pub Sub does not support extending ack deadline") +} + // consumeOne processes a single message using TaskProcessor func (b *Broker) consumeOne(delivery *pubsub.Message, taskProcessor iface.TaskProcessor) { if len(delivery.Data) == 0 { @@ -185,7 +191,7 @@ func (b *Broker) consumeOne(delivery *pubsub.Message, taskProcessor iface.TaskPr log.ERROR.Printf("task %s is not registered", sig.Name) } - err := taskProcessor.Process(sig) + err := taskProcessor.Process(sig, b.extend) if err != nil { delivery.Nack() log.ERROR.Printf("Failed process of task", err) diff --git a/v1/brokers/iface/interfaces.go b/v1/brokers/iface/interfaces.go index c15e8f72..6fe38c8d 100644 --- a/v1/brokers/iface/interfaces.go +++ b/v1/brokers/iface/interfaces.go @@ -23,7 +23,7 @@ type Broker interface { // TaskProcessor - can process a delivered task // This will probably always be a worker instance type TaskProcessor interface { - Process(signature *tasks.Signature) error + Process(signature *tasks.Signature, extendFunc tasks.ExtendForSignatureFunc) error CustomQueue() string PreConsumeHandler() bool } diff --git a/v1/brokers/redis/goredis.go b/v1/brokers/redis/goredis.go index feed029d..3d0de36f 100644 --- a/v1/brokers/redis/goredis.go +++ b/v1/brokers/redis/goredis.go @@ -204,6 +204,12 @@ func (b *BrokerGR) Publish(ctx context.Context, signature *tasks.Signature) erro return err } +func (b *BrokerGR) extend(time.Duration, *tasks.Signature) error { + // Tasks don't need to be extended with redis because messages won't become available again if they're not + // acked/deleted. + return nil +} + // GetPendingTasks returns a slice of task signatures waiting in the queue func (b *BrokerGR) GetPendingTasks(queue string) ([]*tasks.Signature, error) { @@ -314,7 +320,7 @@ func (b *BrokerGR) consumeOne(delivery []byte, taskProcessor iface.TaskProcessor log.DEBUG.Printf("Received new message: %s", delivery) - return taskProcessor.Process(signature) + return taskProcessor.Process(signature, b.extend) } // nextTask pops next available task from the default queue diff --git a/v1/brokers/redis/redis.go b/v1/brokers/redis/redis.go index 1683f13e..f2ff570d 100644 --- a/v1/brokers/redis/redis.go +++ b/v1/brokers/redis/redis.go @@ -216,6 +216,12 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error return err } +func (b *Broker) extend(time.Duration, *tasks.Signature) error { + // Tasks don't need to be extended with redis because messages won't become available again if they're not + // acked/deleted. + return nil +} + // GetPendingTasks returns a slice of task signatures waiting in the queue func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error) { conn := b.open() @@ -346,7 +352,7 @@ func (b *Broker) consumeOne(delivery []byte, taskProcessor iface.TaskProcessor) log.DEBUG.Printf("Received new message: %s", delivery) - return taskProcessor.Process(signature) + return taskProcessor.Process(signature, b.extend) } // nextTask pops next available task from the default queue diff --git a/v1/brokers/sqs/sqs.go b/v1/brokers/sqs/sqs.go index 51e0ce3f..fda19ba6 100644 --- a/v1/brokers/sqs/sqs.go +++ b/v1/brokers/sqs/sqs.go @@ -182,6 +182,19 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error } +func (b *Broker) extend(by time.Duration, signature *tasks.Signature) error { + b.AdjustRoutingKey(signature) + + visibilityInput := &awssqs.ChangeMessageVisibilityInput{ + QueueUrl: aws.String(b.GetConfig().Broker + "/" + signature.RoutingKey), + ReceiptHandle: &signature.SQSReceiptHandle, + VisibilityTimeout: aws.Int64(int64(by.Seconds())), + } + + _, err := b.service.ChangeMessageVisibility(visibilityInput) + return err +} + // consume is a method which keeps consuming deliveries from a channel, until there is an error or a stop signal func (b *Broker) consume(deliveries <-chan *awssqs.ReceiveMessageOutput, concurrency int, taskProcessor iface.TaskProcessor, pool chan struct{}) error { @@ -237,7 +250,7 @@ func (b *Broker) consumeOne(delivery *awssqs.ReceiveMessageOutput, taskProcessor return fmt.Errorf("task %s is not registered", sig.Name) } - err := taskProcessor.Process(sig) + err := taskProcessor.Process(sig, b.extend) if err != nil { // stop task deletion in case we want to send messages to dlq in sqs if err == errs.ErrStopTaskDeletion { diff --git a/v1/tasks/task.go b/v1/tasks/task.go index cae3a9e5..d59cd6fe 100644 --- a/v1/tasks/task.go +++ b/v1/tasks/task.go @@ -27,9 +27,15 @@ type Task struct { Args []reflect.Value } +type ExtendForSignatureFunc func(by time.Duration, signature *Signature) error + +type ExtendFunc func(time.Duration) error + type signatureCtxType struct{} +type extendFuncCtxType struct{} var signatureCtx signatureCtxType +var extendFuncCtx extendFuncCtxType // SignatureFromContext gets the signature from the context func SignatureFromContext(ctx context.Context) *Signature { @@ -46,11 +52,32 @@ func SignatureFromContext(ctx context.Context) *Signature { return signature } +// ExtendFuncFromContext gets the extend function from the context +func ExtendFuncFromContext(ctx context.Context) ExtendFunc { + if ctx == nil { + return nil + } + + v := ctx.Value(extendFuncCtx) + if v == nil { + return nil + } + + ef, _ := v.(ExtendFunc) + return ef +} + // NewWithSignature is the same as New but injects the signature -func NewWithSignature(taskFunc interface{}, signature *Signature) (*Task, error) { +func NewWithSignature(taskFunc interface{}, signature *Signature, extendFunc ExtendForSignatureFunc) (*Task, error) { args := signature.Args ctx := context.Background() ctx = context.WithValue(ctx, signatureCtx, signature) + + var ef ExtendFunc = func(by time.Duration) error { + return extendFunc(by, signature) + } + ctx = context.WithValue(ctx, extendFuncCtx, ef) + task := &Task{ TaskFunc: reflect.ValueOf(taskFunc), Context: ctx, diff --git a/v1/tasks/task_test.go b/v1/tasks/task_test.go index 1fde5ea7..9d90ba8a 100644 --- a/v1/tasks/task_test.go +++ b/v1/tasks/task_test.go @@ -120,7 +120,7 @@ func TestTaskCallWithSignatureInContext(t *testing.T) { } signature, err := tasks.NewSignature("foo", []tasks.Arg{}) assert.NoError(t, err) - task, err := tasks.NewWithSignature(f, signature) + task, err := tasks.NewWithSignature(f, signature, func(time.Duration, *tasks.Signature) error { return nil }) assert.NoError(t, err) taskResults, err := task.Call() assert.NoError(t, err) diff --git a/v1/worker.go b/v1/worker.go index e914f76d..5bb0cf91 100644 --- a/v1/worker.go +++ b/v1/worker.go @@ -133,7 +133,7 @@ func (worker *Worker) Quit() { } // Process handles received tasks and triggers success/error callbacks -func (worker *Worker) Process(signature *tasks.Signature) error { +func (worker *Worker) Process(signature *tasks.Signature, extendFunc tasks.ExtendForSignatureFunc) error { // If the task is not registered with this worker, do not continue // but only return nil as we do not want to restart the worker process if !worker.server.IsTaskRegistered(signature.Name) { @@ -151,7 +151,7 @@ func (worker *Worker) Process(signature *tasks.Signature) error { } // Prepare task for processing - task, err := tasks.NewWithSignature(taskFunc, signature) + task, err := tasks.NewWithSignature(taskFunc, signature, extendFunc) // if this failed, it means the task is malformed, probably has invalid // signature, go directly to task failed without checking whether to retry if err != nil {