-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkgroup.go
171 lines (150 loc) · 4.64 KB
/
workgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Package workgroup provides a mechanism for synchronizing goroutines,
// propagating errors, and context cancellation signals.
// It is designed for managing collections of goroutines working on
// subtasks of a common task. This package is a fork of the
// errgroup.Group library available in `x/sync`, but with modified
// behavior in how it handles goroutine errors and cancellation.
//
// This package offers two different failure modes:
//
// - Collect - All goroutines are allowed to complete, and all errors
// encountered across different goroutines are collected. Wait()
// returns a joined error that combines all errors from the
// individual goroutines.
//
// - FailFast - The first error encountered immediately cancels the
// context of all remaining goroutines and causes Wait() to return
// that error.
//
// `workgroup.Group` also provides options to set a retry policy for
// individual goroutines within the group. A zero-value `Group` will
// collect all errors and return them as a single error.
package workgroup
import (
"context"
"errors"
"sync"
"github.com/avast/retry-go"
)
// FailureMode defines how the workgroup handles errors encountered
// in its goroutines.
type FailureMode int
const (
// Collect instructs the workgroup to collect all errors from
// its goroutines and return them as a single error from `Wait()`.
Collect FailureMode = iota
// FailFast instructs the workgroup to halt execution and cancel
// all remaining goroutines upon the first error encountered.
FailFast
)
// Option is a function that configures a workgroup.
type Option func(*Group)
// WithLimit sets the maximum number of goroutines that can execute
// concurrently within the workgroup.
func WithLimit(n int) Option {
return func(g *Group) {
g.sem = make(chan struct{}, n)
}
}
// WithRetry sets the retry policy for individual goroutines
// within the workgroup.
func WithRetry(opts ...retry.Option) Option {
return func(g *Group) {
g.retryOptions = append(g.retryOptions, opts...)
}
}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero-value `workgroup.Group` is valid and has the following default behavior:
// - No limit on the number of concurrently executing goroutines.
// - Does not cancel on error (uses `Collect` failure mode).
// - Does not retry on error.
type Group struct {
cancel func()
err error
errOnce sync.Once
errLock sync.Mutex
wg sync.WaitGroup
sem chan struct{}
failureMode FailureMode
retryOptions []retry.Option
}
// New creates a new workgroup with the specified failure mode and options.
// It returns a context that is derived from `ctx`.
// The derived context is canceled when the workgroup finishes
// or is canceled explicitly.
// If no Retry is specified, the default behavior is no retries.
func New(ctx context.Context, mode FailureMode, opts ...Option) (context.Context, *Group) {
ctx, cancel := context.WithCancel(ctx)
g := &Group{
cancel: cancel,
failureMode: mode,
retryOptions: []retry.Option{
retry.Attempts(1),
retry.LastErrorOnly(true),
retry.Context(ctx),
},
}
for _, opt := range opts {
opt(g)
}
return ctx, g
}
// Go launches a new goroutine within the workgroup to execute the
// provided function. The function may be retried according to the
// workgroup's retry policy.
// It blocks until the new goroutine can be added without exceeding the
// configured concurrency limit.
func (g *Group) Go(ctx context.Context, fn func() error) {
g.add()
go func() {
defer g.done()
err := retry.Do(fn, g.retryOptions...)
if err != nil {
g.errLock.Lock()
defer g.errLock.Unlock()
if g.failureMode == FailFast {
// In FailFast mode, cancel the workgroup context and
// store the first error encountered.
g.errOnce.Do(func() {
g.err = err
// Signal cancellation to all goroutines.
g.Cancel()
})
return
}
// In Collect mode, aggregate errors from all goroutines.
g.err = errors.Join(g.err, err)
}
}()
}
// Wait blocks until all goroutines in the workgroup have completed.
// It returns nil if all goroutines were successful, or an error
// aggregating the errors encountered, depending on the configured
// failure mode.
func (g *Group) Wait() error {
g.wg.Wait()
// Ensure context is canceled after all goroutines finish.
g.Cancel()
return g.err
}
// Cancel cancels the workgroup context, signaling all running
// goroutines to stop.
func (g *Group) Cancel() {
if g.cancel != nil {
g.cancel()
}
}
func (g *Group) add() {
if g.sem != nil {
g.sem <- struct{}{}
}
g.wg.Add(1)
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}