-
Notifications
You must be signed in to change notification settings - Fork 1
/
array_buffer.go
124 lines (116 loc) · 2.03 KB
/
array_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package flyline
import (
"context"
"runtime"
"sync"
"time"
)
// Note: The array capacity must be a power of two, e.g. 2, 4, 8, 16, 32, 64, etc.
func NewArrayBuffer(capacity int64) Buffer {
b := &arrayBuffer{
capacity: capacity,
buffer: newArray(capacity),
wdSeq: NewSequence(),
wpSeq: NewSequence(),
rdSeq: NewSequence(),
rpSeq: NewSequence(),
sts: &status{},
mutex: &sync.Mutex{},
}
b.sts.setRunning()
return b
}
type arrayBuffer struct {
capacity int64
buffer *array
wpSeq *Sequence
wdSeq *Sequence
rpSeq *Sequence
rdSeq *Sequence
sts *status
mutex *sync.Mutex
}
func (b *arrayBuffer) Send(i interface{}) (err error) {
if b.sts.isClosed() {
err = ErrBufSendClosed
return
}
next := b.wpSeq.Incr()
times := 10
for {
times--
if next-b.capacity-b.rdSeq.Get() <= 0 && next-(b.wdSeq.Get()+1) == 0 {
b.buffer.set(next, i)
b.wdSeq.Incr()
break
}
time.Sleep(ns1)
if times <= 0 {
runtime.Gosched()
times = 10
}
}
return
}
func (b *arrayBuffer) Recv() (value interface{}, active bool) {
active = true
if b.sts.isClosed() && b.Len() == int64(0) {
active = false
return
}
times := 10
next := b.rpSeq.Incr()
for {
if next-b.wdSeq.Get() <= 0 && next-(b.rdSeq.Get()+1) == 0 {
value = b.buffer.get(next)
b.rdSeq.Incr()
break
}
time.Sleep(ns1)
if times <= 0 {
runtime.Gosched()
times = 10
}
}
return
}
func (b *arrayBuffer) Len() (length int64) {
length = b.wpSeq.Get() - b.rdSeq.Get()
return
}
func (b *arrayBuffer) Close() (err error) {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.sts.isClosed() {
err = ErrBufCloseClosed
return
}
b.sts.setClosed()
return
}
func (b *arrayBuffer) Sync(ctx context.Context) (err error) {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.sts.isRunning() {
err = ErrBufSyncUnclosed
return
}
for {
ok := false
select {
case <-ctx.Done():
ok = true
break
default:
if b.Len() == int64(0) {
ok = true
break
}
time.Sleep(ms500)
}
if ok {
break
}
}
return
}