-
Notifications
You must be signed in to change notification settings - Fork 1
/
bucket.go
183 lines (164 loc) · 4.34 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package directcache
import (
"bytes"
"errors"
"sync"
"github.com/cespare/xxhash/v2"
)
// bucket indexes and holds entries.
type bucket struct {
m vmap // maps key hash to offset
q fifo // the queue buffer stores entries
shouldEvict func(entry Entry) bool // the custom evention policy
lock sync.RWMutex
}
// Reset resets the bucket with new capacity and new eviction method.
// If shouldEvict is nil, the default LRU policy is used.
// It drops all entries.
func (b *bucket) Reset(capacity int) {
b.lock.Lock()
b.m.Reset(capacity - 1)
b.q.Reset(capacity)
b.lock.Unlock()
}
// SetEvictionPolicy customizes the cache eviction policy.
func (b *bucket) SetEvictionPolicy(shouldEvict func(entry Entry) bool) {
b.lock.Lock()
b.shouldEvict = shouldEvict
b.lock.Unlock()
}
// Set set val for key.
// false returned and nothing changed if the new entry size exceeds the capacity of this bucket.
func (b *bucket) Set(key []byte, keyHash uint64, valLen int, fn func(val []byte)) bool {
b.lock.Lock()
defer b.lock.Unlock()
if offset, found := b.m.Get(keyHash); found {
ent := b.entryAt(offset)
if spare := ent.BodySize() - len(key) - valLen; spare >= 0 { // in-place update
fn(ent.Init(key, valLen, spare))
ent.AddFlag(recentlyUsedFlag) // avoid evicted too early
return true
}
// key not matched or in-place update failed
ent.AddFlag(deletedFlag)
}
// insert new entry
if offset, ok := b.insertEntry(key, valLen, 0, fn); ok {
b.m.Set(keyHash, offset)
return true
}
return false
}
// Del deletes the key.
// false is returned if key does not exist.
func (b *bucket) Del(key []byte, keyHash uint64) bool {
b.lock.Lock()
defer b.lock.Unlock()
if offset, found := b.m.Get(keyHash); found {
if ent := b.entryAt(offset); bytes.Equal(ent.Key(), key) {
b.m.Del(keyHash)
ent.AddFlag(deletedFlag)
return true
}
}
return false
}
// Get get the value for key.
// false is returned if the key not found.
// If peek is true, the entry will not be marked as recently-used.
func (b *bucket) Get(key []byte, keyHash uint64, fn func(val []byte), peek bool) bool {
b.lock.RLock()
defer b.lock.RUnlock()
if offset, found := b.m.Get(keyHash); found {
if ent := b.entryAt(offset); bytes.Equal(ent.Key(), key) {
if !peek {
ent.AddFlag(recentlyUsedFlag)
}
if fn != nil {
fn(ent.Value())
}
return true
}
}
return false
}
// Dump dumps entries.
func (b *bucket) Dump(f func(Entry) bool) bool {
b.lock.RLock()
defer b.lock.RUnlock()
size := b.q.Size()
offset := b.q.Front()
for size > 0 {
ent := b.entryAt(offset)
if len(ent) == 0 {
offset = 0
continue
}
size -= ent.Size()
offset += ent.Size()
if !ent.HasFlag(deletedFlag) && !f(ent) {
return false
}
}
return true
}
// entryAt creates an entry object at the offset of the queue buffer.
func (b *bucket) entryAt(offset int) entry {
return b.q.Slice(offset)
}
// insertEntry insert a new entry and returns its offset.
// Old entries are evicted like LRU strategy if no enough space.
func (b *bucket) insertEntry(key []byte, valLen int, spare int, fn func(val []byte)) (int, bool) {
entrySize := entrySize(len(key), valLen, spare)
if entrySize > b.q.Cap() {
return 0, false
}
pushLimit := 8
for {
// have a try
if offset, ok := b.q.Push(nil, entrySize); ok {
fn(entry(b.q.Slice(offset)).Init(key, valLen, spare))
return offset, true
}
// no space
// pop an entry at the front of the queue buffer
ent := b.entryAt(b.q.Front())
ent = ent[:ent.Size()]
if _, ok := b.q.Pop(len(ent)); !ok {
// will never go here if entry is correctly implemented
panic(errors.New("bucket.allocEntry: pop entry failed"))
}
// good, deleted entry
if ent.HasFlag(deletedFlag) {
continue
}
keyHash := xxhash.Sum64(ent.Key())
// pushLimit exceeded
if pushLimit < 1 {
b.m.Del(keyHash)
continue
}
if b.shouldEvict == nil {
// the default LRU policy
if !ent.HasFlag(recentlyUsedFlag) {
b.m.Del(keyHash)
continue
}
} else {
// the custom eviction policy
if b.shouldEvict(ent) {
b.m.Del(keyHash)
continue
}
}
pushLimit--
ent.RemoveFlag(recentlyUsedFlag)
// and push back to the queue
if offset, ok := b.q.Push(ent, 0); ok {
// update the offset
b.m.Set(keyHash, offset)
} else {
panic("bucket.allocEntry: push entry failed")
}
}
}