Skip to content

Commit

Permalink
cache: refactor, and fix of race condition between expiration gorouti…
Browse files Browse the repository at this point in the history
…ne and external calls.
  • Loading branch information
colindickson committed Sep 18, 2023
1 parent 25b6d9c commit c8e3dea
Showing 1 changed file with 47 additions and 23 deletions.
70 changes: 47 additions & 23 deletions pkg/cache/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type sortableItem struct {

type TTLMap struct {
m map[string]*item
l sync.Mutex
l sync.RWMutex
maxItems int

metrics Metrics
Expand All @@ -39,15 +39,17 @@ func NewTTLMap(maxItems int, name, namespace string) (m *TTLMap) {

go func() {
for now := range time.Tick(time.Second * 1) {
m.l.Lock()
for k, v := range m.m {
if v.invincible {
continue
}

if v.expiresAt.Before(now) {
m.Delete(k)
m.delete(k, v.value, v.expiresAt)
}
}
m.l.Unlock()
}
}()

Expand Down Expand Up @@ -75,63 +77,78 @@ func (m *TTLMap) OnItemAdded(f func(string, interface{}, time.Time)) {
}

func (m *TTLMap) Delete(k string) {
val, expiresAt, err := m.Get(k)
m.l.Lock()
defer m.l.Unlock()

val, expiresAt, err := m.get(k)
if err != nil {
return
}

m.l.Lock()
m.delete(k, val, expiresAt)
}

func (m *TTLMap) delete(k string, val interface{}, expiresAt time.Time) {
delete(m.m, k)

m.metrics.ObserveOperations(OperationDEL, 1)

for _, f := range m.deletedCallbacks {
go f(k, val, expiresAt)
}

m.l.Unlock()
}

func (m *TTLMap) evictItemToClosestToExpiry() {
// This is a very naive implementation.
items := []sortableItem{}
evictableItems := make([]sortableItem, 0, len(m.m))

// Get all non-invincible items.
for k, v := range m.m {
if v.invincible {
continue
}

items = append(items, sortableItem{
evictableItems = append(evictableItems, sortableItem{
key: k,
expiresAt: v.expiresAt,
})
}

sort.Slice(items, func(i, j int) bool {
return items[i].expiresAt.Before(items[j].expiresAt)
if len(evictableItems) == 0 {
return
}

sort.Slice(evictableItems, func(i, j int) bool {
return evictableItems[i].expiresAt.Before(evictableItems[j].expiresAt)
})

if len(items) > 0 {
m.Delete(items[0].key)
m.metrics.ObserveOperations(OperationEVICT, 1)
}
m.delete(evictableItems[0].key, evictableItems[0].expiresAt, evictableItems[0].expiresAt)
m.metrics.ObserveOperations(OperationEVICT, 1)
}

func (m *TTLMap) Len() int {
m.l.RLock()
defer m.l.RUnlock()

return m.len()
}

func (m *TTLMap) len() int {
return len(m.m)
}

func (m *TTLMap) Add(k string, v interface{}, expiresAt time.Time, invincible bool) {
if m.Len() >= m.maxItems {
m.evictItemToClosestToExpiry()
}

m.l.Lock()

defer m.l.Unlock()

m.add(k, v, expiresAt, invincible)
}

func (m *TTLMap) add(k string, v interface{}, expiresAt time.Time, invincible bool) {
if m.len() >= m.maxItems {
m.evictItemToClosestToExpiry()
}

it, ok := m.m[k]
if !ok {
it = &item{
Expand All @@ -150,16 +167,23 @@ func (m *TTLMap) Add(k string, v interface{}, expiresAt time.Time, invincible bo
}

func (m *TTLMap) Get(k string) (interface{}, time.Time, error) {
m.metrics.ObserveOperations(OperationGET, 1)
m.l.RLock()
itv, expires, err := m.get(k)
m.l.RUnlock()

m.l.Lock()
if err != nil {
return nil, time.Now(), err
}

defer m.l.Unlock()
return itv, expires, err
}

func (m *TTLMap) get(k string) (interface{}, time.Time, error) {
m.metrics.ObserveOperations(OperationGET, 1)

it, ok := m.m[k]
if !ok {
m.metrics.ObserveMiss()

return nil, time.Now(), errors.New("not found")
}

Expand Down

0 comments on commit c8e3dea

Please sign in to comment.