-
Notifications
You must be signed in to change notification settings - Fork 12
/
msg_ring.go
142 lines (119 loc) · 3.26 KB
/
msg_ring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright 2014-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package cbgt
import (
"fmt"
"io"
"sync"
)
// MsgRingMaxSmallBufSize is the cutoff point, in bytes, in which a
// msg ring categorizes a buf as small versus large for reuse.
var MsgRingMaxSmallBufSize = 1024
// MsgRingMaxSmallBufSize is the max pool size for reused buf's.
var MsgRingMaxBufPoolSize = 8
// A MsgRing wraps an io.Writer, and remembers a ring of previous
// writes to the io.Writer. It is concurrent safe and is useful, for
// example, for remembering recent log messages.
type MsgRing struct {
m sync.Mutex
inner io.Writer
Next int `json:"next"`
Msgs [][]byte `json:"msgs"`
SmallBufs [][]byte // Pool of small buffers.
LargeBufs [][]byte // Pool of large buffers.
}
// NewMsgRing returns a MsgRing of a given ringSize.
func NewMsgRing(inner io.Writer, ringSize int) (*MsgRing, error) {
if inner == nil {
return nil, fmt.Errorf("msg_ring: nil inner io.Writer")
}
if ringSize <= 0 {
return nil, fmt.Errorf("msg_ring: non-positive ring size")
}
return &MsgRing{
inner: inner,
Next: 0,
Msgs: make([][]byte, ringSize),
}, nil
}
// Implements the io.Writer interface.
func (m *MsgRing) Write(p []byte) (n int, err error) {
m.m.Lock()
// Recycle the oldMsg into the small-vs-large pools, as long as
// there's enough pool space.
oldMsg := m.Msgs[m.Next]
if oldMsg != nil {
if len(oldMsg) <= MsgRingMaxSmallBufSize {
if len(m.SmallBufs) < MsgRingMaxBufPoolSize {
m.SmallBufs = append(m.SmallBufs)
}
} else {
if len(m.LargeBufs) < MsgRingMaxBufPoolSize {
m.LargeBufs = append(m.LargeBufs)
}
}
}
// Allocate a new buf or recycled buf from the pools.
var buf []byte
if len(p) <= MsgRingMaxSmallBufSize {
if len(m.SmallBufs) > 0 {
buf = m.SmallBufs[len(m.SmallBufs)-1]
m.SmallBufs = m.SmallBufs[0 : len(m.SmallBufs)-1]
}
} else {
// Although we wastefully throw away any cached large bufs
// that aren't large enough, this simple approach doesn't
// "learn" the wrong large buf size.
for len(m.LargeBufs) > 0 && buf == nil {
largeBuf := m.LargeBufs[len(m.LargeBufs)-1]
m.LargeBufs = m.LargeBufs[0 : len(m.LargeBufs)-1]
if len(p) <= cap(largeBuf) {
buf = largeBuf
}
}
}
if buf == nil {
buf = make([]byte, len(p))
}
copy(buf[0:len(p)], p)
m.Msgs[m.Next] = buf
m.Next += 1
if m.Next >= len(m.Msgs) {
m.Next = 0
}
m.m.Unlock()
return m.inner.Write(p)
}
// Retrieves the recent writes to the MsgRing.
func (m *MsgRing) Messages() [][]byte {
rv := make([][]byte, 0, len(m.Msgs))
m.m.Lock()
// Pre-alloc a buf to hold a copy of all msgs.
bufSize := 0
for _, msg := range m.Msgs {
bufSize += len(msg)
}
buf := make([]byte, 0, bufSize)
n := len(m.Msgs)
i := 0
idx := m.Next
for i < n {
if msg := m.Msgs[idx]; msg != nil {
bufLen := len(buf)
buf = append(buf, msg...)
rv = append(rv, buf[bufLen:])
}
idx += 1
if idx >= n {
idx = 0
}
i += 1
}
m.m.Unlock()
return rv
}