From c57e54518bbd7284535484330c9a86a3a514546b Mon Sep 17 00:00:00 2001 From: Alexandre Perrin Date: Tue, 7 Feb 2023 10:11:07 +0100 Subject: [PATCH] add NewWithContext Before this patch, forwarding context cancellation to the workerpool tasks was difficult and required to spawn a goroutine to call wp.Close when the parent context was done. This patch introduce NewWithContext allowing callers to provide a parent context from which is derived the tasks context, removing the need to manually forward context cancellation. Signed-off-by: Alexandre Perrin --- workerpool.go | 8 ++++++- workerpool_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/workerpool.go b/workerpool.go index ce8662f..c78af83 100644 --- a/workerpool.go +++ b/workerpool.go @@ -50,6 +50,12 @@ type WorkerPool struct { // New creates a new pool of workers where at most n workers process submitted // tasks concurrently. New panics if n ≤ 0. func New(n int) *WorkerPool { + return NewWithContext(context.Background(), n) +} + +// NewWithContext creates a new pool of workers where at most n workers process submitted +// tasks concurrently. New panics if n ≤ 0. The context is used as the parent context to the context of the task func passed to Submit. +func NewWithContext(ctx context.Context, n int) *WorkerPool { if n <= 0 { panic(fmt.Sprintf("workerpool.New: n must be > 0, got %d", n)) } @@ -57,7 +63,7 @@ func New(n int) *WorkerPool { workers: make(chan struct{}, n), tasks: make(chan *task), } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) wp.cancel = cancel go wp.run(ctx) return wp diff --git a/workerpool_test.go b/workerpool_test.go index 84b647b..652c272 100644 --- a/workerpool_test.go +++ b/workerpool_test.go @@ -402,3 +402,60 @@ func TestWorkerPoolClose(t *testing.T) { } wg.Wait() // all routines should have returned } + +func TestWorkerPoolNewWithContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n := runtime.NumCPU() + wp := workerpool.NewWithContext(ctx, n) + + // working is written to by each task as soon as possible. + working := make(chan struct{}) + var wg sync.WaitGroup + // Create n tasks waiting on the context to be cancelled. + wg.Add(n) + for i := 0; i < n; i++ { + id := fmt.Sprintf("task #%2d", i) + err := wp.Submit(id, func(ctx context.Context) error { + working <- struct{}{} + <-ctx.Done() + wg.Done() + return ctx.Err() + }) + if err != nil { + t.Errorf("failed to submit task '%s': %v", id, err) + } + } + + // ensure n workers are busy + for i := 0; i < n; i++ { + <-working + } + + // cancel the parent context, which should complete all tasks. + cancel() + wg.Wait() + + // Submitting a task once the parent context has been cancelled should + // succeed and give a cancelled context to the task. This is not ideal and + // might change in the future. + wg.Add(1) + id := "last" + err := wp.Submit(id, func(ctx context.Context) error { + defer wg.Done() + select { + case <-ctx.Done(): + default: + t.Errorf("last task expected context to be cancelled") + } + return nil + }) + if err != nil { + t.Errorf("failed to submit task '%s': %v", id, err) + } + + wg.Wait() + + if err := wp.Close(); err != nil { + t.Errorf("close: got '%v', want no error", err) + } +}