-
Notifications
You must be signed in to change notification settings - Fork 46
/
Copy pathstoppable_workers_test.go
165 lines (144 loc) · 4.01 KB
/
stoppable_workers_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package utils
import (
"bytes"
"context"
"testing"
"time"
"go.viam.com/test"
)
func TestStoppableWorkers(t *testing.T) {
// Goleak checks from `VerifyTestMain` for `utils_test` should cause the
// following tests to fail if `StoppableWorkers` leaks any goroutines.
ctx := context.Background()
t.Run("one worker", func(t *testing.T) {
sw := NewStoppableWorkers(ctx)
sw.Add(normalWorker)
sw.Stop()
ctx := sw.Context()
test.That(t, ctx, test.ShouldNotBeNil)
test.That(t, ctx.Err(), test.ShouldBeError, context.Canceled)
})
t.Run("one worker background constructor", func(t *testing.T) {
sw := NewBackgroundStoppableWorkers(normalWorker)
sw.Stop()
ctx := sw.Context()
test.That(t, ctx, test.ShouldNotBeNil)
test.That(t, ctx.Err(), test.ShouldBeError, context.Canceled)
})
t.Run("heavy workers", func(t *testing.T) {
sw := NewStoppableWorkers(ctx)
sw.Add(heavyWorker)
sw.Add(heavyWorker)
sw.Add(heavyWorker)
// Sleep for half a second to let heavy workers do work.
time.Sleep(500 * time.Millisecond)
sw.Stop()
})
t.Run("concurrent workers", func(t *testing.T) {
ints := make(chan int)
writeWorker := func(ctx context.Context) {
var count int
for {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
// Remain sensitive to context in sending to `ints` channel, so this
// goroutine does not get hung when `readWorker` exits without
// reading the last int.
select {
case <-ctx.Done():
case ints <- count:
}
}
}
}
var receivedInts []int
readWorker := func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case i := <-ints:
receivedInts = append(receivedInts, i)
}
}
}
sw := NewBackgroundStoppableWorkers(writeWorker, readWorker)
// Sleep for a second to let concurrent workers do work.
time.Sleep(500 * time.Millisecond)
sw.Stop()
test.That(t, len(receivedInts), test.ShouldBeGreaterThan, 0)
})
t.Run("nested workers", func(t *testing.T) {
sw := NewStoppableWorkers(ctx)
sw.Add(nestedWorkersWorker)
sw.Stop()
})
t.Run("panicking worker", func(t *testing.T) {
sw := NewStoppableWorkers(ctx)
// Both adding and stopping a panicking worker should cause no `panic`s.
sw.Add(panickingWorker)
sw.Stop()
})
t.Run("already stopped", func(t *testing.T) {
sw := NewStoppableWorkers(ctx)
sw.Stop()
sw.Add(normalWorker) // adding after Stop should cause no `panic`
sw.Stop() // stopping twice should cause no `panic`
})
}
func TestStoppableWorkersWithTicker(t *testing.T) {
timesCalled := 0
workFn := func(ctx context.Context) {
timesCalled++
select {
case <-time.After(24 * time.Hour):
t.Log("Failed to observe `Stop` call.")
// Realistically, the go test timeout will be hit and not this `FailNow` call.
t.FailNow()
case <-ctx.Done():
return
}
}
// Create a worker with a ticker << the sleep time the test will use. The work function
// increments a counter and hangs. This test will logically assert that:
// - The work function was called exactly once.
// - The work function was passed a context that observed `Stop` was called.
sw := NewStoppableWorkerWithTicker(time.Millisecond, workFn)
time.Sleep(time.Second)
sw.Stop()
test.That(t, timesCalled, test.ShouldEqual, 1)
}
func normalWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Millisecond):
}
}
}
// like `normalWorker`, but writes and reads bytes from a buffer.
func heavyWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Millisecond):
var buffer bytes.Buffer
data := make([]byte, 10000)
buffer.Write(data)
readData := make([]byte, buffer.Len())
buffer.Read(readData)
}
}
}
func nestedWorkersWorker(ctx context.Context) {
nestedSW := NewStoppableWorkers(ctx)
nestedSW.Add(normalWorker)
normalWorker(ctx)
}
func panickingWorker(_ context.Context) {
panic("this worker panicked; ignore expected stack trace above")
}