-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgac.go
78 lines (47 loc) · 1.35 KB
/
gac.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// -------------------------
// Copyright 2016, undiabler
// git: github.com/undiabler/golang-async-channels
//--------------------------
package gac
type proxyTube struct {
chanFrom chan interface{}
chanTo chan interface{}
}
// NewAsyncChannel creates input and output unlimited chans
func NewAsyncChannel() (chanFrom, chanTo chan interface{}) {
//TODO: think about returning proxyTube struct to avoid memory leaks
p := new(proxyTube)
//TODO: maybe sometimes you will want buffered channels for even more amortization
p.chanFrom = make(chan interface{})
p.chanTo = make(chan interface{})
go p.proxyWorker()
return p.chanFrom, p.chanTo
}
func (p *proxyTube) proxyWorker() {
var items []interface{}
for {
itemsLen := len(items)
if itemsLen == 0 {
select {
case tmp := <-p.chanFrom:
// fmt.Printf("1/received message: %s\n", tmp)
select {
case p.chanTo <- tmp:
// fmt.Printf("1/received message (%s) proxified to job, 0 latency\n", tmp)
continue
default:
items = append(items, tmp)
}
}
} else {
select {
case tmp := <-p.chanFrom:
// fmt.Printf("2/received message: %s, push to long list\n", tmp)
items = append(items, tmp)
case p.chanTo <- items[itemsLen-1]:
// fmt.Printf("2/send (%s) async to job...\n", items[itemsLen-1])
items = items[:itemsLen-1]
}
}
}
}