forked from affinityv/humblebundle-downloader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
81 lines (67 loc) · 1.54 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main
import (
"sync"
"gopkg.in/cheggaaa/pb.v1"
)
// Pool is a worker group that runs a number of tasks at a
// configured concurrency.
type Pool struct {
Tasks []*Task
concurrency int
tasksChan chan *Task
wg sync.WaitGroup
bar *pb.ProgressBar
}
// NewPool initializes a new pool with the given tasks and
// at the given concurrency.
func NewPool(tasks []*Task, concurrency int) *Pool {
return &Pool{
Tasks: tasks,
concurrency: concurrency,
tasksChan: make(chan *Task),
bar: pb.New(len(tasks)),
}
}
// The work loop for any single goroutine.
func (p *Pool) work() {
for task := range p.tasksChan {
task.Run(&p.wg)
p.bar.Increment()
}
}
// Run runs all work within the pool and blocks until it's
// finished.
func (p *Pool) Run() {
p.bar.Start()
for i := 0; i < p.concurrency; i++ {
go p.work()
}
p.wg.Add(len(p.Tasks))
for _, task := range p.Tasks {
p.tasksChan <- task
}
// all workers return
close(p.tasksChan)
p.wg.Wait()
p.bar.Finish()
}
// Task encapsulates a work item that should go in a work
// pool.
type Task struct {
// Err holds an error that occurred during a task. Its
// result is only meaningful after Run has been called
// for the pool that holds it.
Err error
f func() error
}
// NewTask initializes a new task based on a given work
// function.
func NewTask(f func() error) *Task {
return &Task{f: f}
}
// Run runs a Task and does appropriate accounting via a
// given sync.WorkGroup.
func (t *Task) Run(wg *sync.WaitGroup) {
t.Err = t.f()
wg.Done()
}