Skip to content

Commit

Permalink
platform/6915: Async cache writing
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghnuberath committed Jun 13, 2024
1 parent 4d85a84 commit ca48813
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/gocache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *Cache[K, V]) HasKey(k K) (bool, error) {
}

func (c *Cache[K, V]) Get(k K, getter func() (*V, error)) (*V, error) {
valpromise, alreadyExists := c.cache.GetOrCreate(k, NewPromise[V]())
valpromise, alreadyExists, _ := c.cache.GetOrCreate(k, NewPromise[V]())
if alreadyExists {
val, err := valpromise.Wait()
if err != nil {
Expand Down
24 changes: 17 additions & 7 deletions pkg/gocache/partitioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,30 @@ func (c *partitionedCache[K, V]) putUnsafe(key K, value *V) {
c.lru[partition].push(e)
}

func (c *partitionedCache[K, V]) GetOrCreate(key K, value *V) (*V, bool) {
func (c *partitionedCache[K, V]) GetOrCreate(key K, value *V) (*V, bool, *Promise[bool]) {
p := NewPromise[bool]()
// first, try to read the value by acquiring the read lock only
c.rUpgradeableLock(key)
defer c.rUpgradeableUnlock(key)
v, ok := c.getUnsafe(key)
if ok {
return v, true
res := true
c.rUpgradeableUnlock(key)
p.Resolve(&res)
return v, true, p
}

// if the value was not present, we need to store it instead,
// so upgrade to a write lock
c.wUpgradeLock(key)
c.putUnsafe(key, value)
return value, false
// so upgrade to a write lock and write the value asynchronously
defer func() {
go func() {
res := false
c.wUpgradeLock(key)
c.putUnsafe(key, value)
c.rUpgradeableUnlock(key)
p.Resolve(&res)
}()
}()
return value, false, p
}

func (c *partitionedCache[K, V]) HasKey(key K) bool {
Expand Down
17 changes: 11 additions & 6 deletions pkg/gocache/partitioned_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ func TestPartitionedCache(t *testing.T) {
num := 6

cache := newPartitionedCached[string, int](sizeOfPartition, numPartitions, 1*time.Hour)
_, exists := cache.GetOrCreate("A", &num)
_, exists, p := cache.GetOrCreate("A", &num)
assert.False(t, exists)

p.Wait()

exists = cache.HasKey("A")
assert.True(t, exists)

val, exists := cache.GetOrCreate("A", &num)
val, exists, _ := cache.GetOrCreate("A", &num)
assert.True(t, exists)
assert.Equal(t, 6, *val)
}
Expand All @@ -47,8 +49,9 @@ func TestPartitionedCacheEviction(t *testing.T) {
cache := newPartitionedCached[string, int](sizeOfPartition, numPartitions, time.Hour)
for i := 0; i < 100000; i++ {
key := randString(16)
_, existsAlready := cache.GetOrCreate(key, nil)
_, existsAlready, p := cache.GetOrCreate(key, nil)
assert.False(t, existsAlready)
p.Wait()
assert.LessOrEqual(t, cache.PartitionLen(key), sizeOfPartition)
}
}
Expand All @@ -58,12 +61,13 @@ func TestPartitionedCacheTTL(t *testing.T) {
numPartitions := 4
cache := newPartitionedCached[string, int](sizeOfPartition, numPartitions, time.Microsecond)

_, existsAlready := cache.GetOrCreate("A", nil)
_, existsAlready, p := cache.GetOrCreate("A", nil)
assert.False(t, existsAlready)
p.Wait()

time.Sleep(time.Millisecond)

_, existsAlready = cache.GetOrCreate("A", nil)
_, existsAlready, _ = cache.GetOrCreate("A", nil)
assert.False(t, existsAlready)
}

Expand All @@ -81,7 +85,8 @@ func TestPartitionedCacheAsyncTest(t *testing.T) {
for j := 0; j < numActions; j++ {
v := j
k := randLetter()
cache.GetOrCreate(k, &v)
_, _, p := cache.GetOrCreate(k, &v)
p.Wait()
assert.GreaterOrEqual(t, cache.PartitionLen(k), 1)
assert.LessOrEqual(t, cache.PartitionLen(k), sizeOfPartition)
time.Sleep(time.Millisecond)
Expand Down

0 comments on commit ca48813

Please sign in to comment.