-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmemory_cache_adapter.go
113 lines (98 loc) · 2.67 KB
/
memory_cache_adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package query
import (
"sync"
"sync/atomic"
"time"
)
// MemoryCacheAdapter is the struct used for memory caching purposes.
type MemoryCacheAdapter struct {
sync.RWMutex
cachedResults map[string]*Result
cleanerSignal chan bool
shuttingDown *uint32
sleepTimer *time.Timer
sleepUntil time.Time
}
// NewMemoryCacheAdapter initializes a new *MemoryCacheAdapter.
// This function will also initialize the respective cleaner routine.
func NewMemoryCacheAdapter() *MemoryCacheAdapter {
ad := &MemoryCacheAdapter{
cachedResults: make(map[string]*Result),
cleanerSignal: make(chan bool, 1),
shuttingDown: new(uint32),
}
go ad.cleaner()
return ad
}
// Set stores the cache value for the given query.
func (ad *MemoryCacheAdapter) Set(qry Cacheable, res *Result) bool {
ad.Lock()
ad.cachedResults[string(qry.CacheKey())] = res
ad.Unlock()
ad.clean()
return true
}
// Get retrieves the cached result for the provided query.
func (ad *MemoryCacheAdapter) Get(qry Cacheable) *Result {
ad.RLock()
res := ad.cachedResults[string(qry.CacheKey())]
ad.RUnlock()
return res
}
// Expire can optionally be used to forcibly expire a query cache.
func (ad *MemoryCacheAdapter) Expire(qry Cacheable) {
ck := string(qry.CacheKey())
ad.Lock()
if _, isCached := ad.cachedResults[ck]; isCached {
delete(ad.cachedResults, ck)
}
ad.Unlock()
}
// Shutdown is used to stop the cleaner routine.
func (ad *MemoryCacheAdapter) Shutdown() {
atomic.CompareAndSwapUint32(ad.shuttingDown, 0, 1)
ad.clean()
}
//------Internal------//
func (ad *MemoryCacheAdapter) cleaner() {
for atomic.LoadUint32(ad.shuttingDown) == 0 {
now := time.Now()
ad.sleepUntil = time.Time{}
ad.Lock()
for key, res := range ad.cachedResults {
if !res.CachedAt().IsZero() && now.After(res.ExpiresAt()) {
delete(ad.cachedResults, key)
continue
}
ad.updateSleepUntil(res.ExpiresAt())
}
ad.Unlock()
ad.updateSleepTimer(ad.determineSleepDuration())
// allow the cleaner to be triggered either with timer or directly
select {
case <-ad.sleepTimer.C:
case <-ad.cleanerSignal:
}
}
}
func (ad *MemoryCacheAdapter) clean() {
select {case ad.cleanerSignal <- true: default:}
}
func (ad *MemoryCacheAdapter) updateSleepUntil(expiresAt time.Time) {
if ad.sleepUntil.IsZero() || expiresAt.Before(ad.sleepUntil) {
ad.sleepUntil = expiresAt
}
}
func (ad *MemoryCacheAdapter) determineSleepDuration() time.Duration {
if ad.sleepUntil.IsZero() || len(ad.cachedResults) <= 0 {
return time.Hour
}
return ad.sleepUntil.Sub(time.Now())
}
func (ad *MemoryCacheAdapter) updateSleepTimer(d time.Duration) {
if ad.sleepTimer == nil {
ad.sleepTimer = time.NewTimer(d)
return
}
ad.sleepTimer.Reset(d)
}