From ca48813e4f79ff67210dc55c72906496b9fb44a4 Mon Sep 17 00:00:00 2001 From: Sean McIntyre Date: Thu, 13 Jun 2024 15:55:58 -0400 Subject: [PATCH 1/2] platform/6915: Async cache writing --- pkg/gocache/cache.go | 2 +- pkg/gocache/partitioned.go | 24 +++++++++++++++++------- pkg/gocache/partitioned_test.go | 17 +++++++++++------ 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/pkg/gocache/cache.go b/pkg/gocache/cache.go index 419830c..ebd891b 100644 --- a/pkg/gocache/cache.go +++ b/pkg/gocache/cache.go @@ -21,7 +21,7 @@ func (c *Cache[K, V]) HasKey(k K) (bool, error) { } func (c *Cache[K, V]) Get(k K, getter func() (*V, error)) (*V, error) { - valpromise, alreadyExists := c.cache.GetOrCreate(k, NewPromise[V]()) + valpromise, alreadyExists, _ := c.cache.GetOrCreate(k, NewPromise[V]()) if alreadyExists { val, err := valpromise.Wait() if err != nil { diff --git a/pkg/gocache/partitioned.go b/pkg/gocache/partitioned.go index 62d3ffa..09f360c 100644 --- a/pkg/gocache/partitioned.go +++ b/pkg/gocache/partitioned.go @@ -134,20 +134,30 @@ func (c *partitionedCache[K, V]) putUnsafe(key K, value *V) { c.lru[partition].push(e) } -func (c *partitionedCache[K, V]) GetOrCreate(key K, value *V) (*V, bool) { +func (c *partitionedCache[K, V]) GetOrCreate(key K, value *V) (*V, bool, *Promise[bool]) { + p := NewPromise[bool]() // first, try to read the value by acquiring the read lock only c.rUpgradeableLock(key) - defer c.rUpgradeableUnlock(key) v, ok := c.getUnsafe(key) if ok { - return v, true + res := true + c.rUpgradeableUnlock(key) + p.Resolve(&res) + return v, true, p } // if the value was not present, we need to store it instead, - // so upgrade to a write lock - c.wUpgradeLock(key) - c.putUnsafe(key, value) - return value, false + // so upgrade to a write lock and write the value asynchronously + defer func() { + go func() { + res := false + c.wUpgradeLock(key) + c.putUnsafe(key, value) + c.rUpgradeableUnlock(key) + p.Resolve(&res) + }() + }() + return value, false, p } func (c *partitionedCache[K, V]) HasKey(key K) bool { diff --git a/pkg/gocache/partitioned_test.go b/pkg/gocache/partitioned_test.go index d6dacf0..97a9574 100644 --- a/pkg/gocache/partitioned_test.go +++ b/pkg/gocache/partitioned_test.go @@ -29,13 +29,15 @@ func TestPartitionedCache(t *testing.T) { num := 6 cache := newPartitionedCached[string, int](sizeOfPartition, numPartitions, 1*time.Hour) - _, exists := cache.GetOrCreate("A", &num) + _, exists, p := cache.GetOrCreate("A", &num) assert.False(t, exists) + p.Wait() + exists = cache.HasKey("A") assert.True(t, exists) - val, exists := cache.GetOrCreate("A", &num) + val, exists, _ := cache.GetOrCreate("A", &num) assert.True(t, exists) assert.Equal(t, 6, *val) } @@ -47,8 +49,9 @@ func TestPartitionedCacheEviction(t *testing.T) { cache := newPartitionedCached[string, int](sizeOfPartition, numPartitions, time.Hour) for i := 0; i < 100000; i++ { key := randString(16) - _, existsAlready := cache.GetOrCreate(key, nil) + _, existsAlready, p := cache.GetOrCreate(key, nil) assert.False(t, existsAlready) + p.Wait() assert.LessOrEqual(t, cache.PartitionLen(key), sizeOfPartition) } } @@ -58,12 +61,13 @@ func TestPartitionedCacheTTL(t *testing.T) { numPartitions := 4 cache := newPartitionedCached[string, int](sizeOfPartition, numPartitions, time.Microsecond) - _, existsAlready := cache.GetOrCreate("A", nil) + _, existsAlready, p := cache.GetOrCreate("A", nil) assert.False(t, existsAlready) + p.Wait() time.Sleep(time.Millisecond) - _, existsAlready = cache.GetOrCreate("A", nil) + _, existsAlready, _ = cache.GetOrCreate("A", nil) assert.False(t, existsAlready) } @@ -81,7 +85,8 @@ func TestPartitionedCacheAsyncTest(t *testing.T) { for j := 0; j < numActions; j++ { v := j k := randLetter() - cache.GetOrCreate(k, &v) + _, _, p := cache.GetOrCreate(k, &v) + p.Wait() assert.GreaterOrEqual(t, cache.PartitionLen(k), 1) assert.LessOrEqual(t, cache.PartitionLen(k), sizeOfPartition) time.Sleep(time.Millisecond) From 6bc1d7eedb65092e38a330edb42b9a1815dc7e11 Mon Sep 17 00:00:00 2001 From: Chris Sandison Date: Thu, 13 Jun 2024 16:02:21 -0400 Subject: [PATCH 2/2] Not exposing getOrCreate --- pkg/gocache/cache.go | 2 +- pkg/gocache/partitioned.go | 2 +- pkg/gocache/partitioned_test.go | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/gocache/cache.go b/pkg/gocache/cache.go index ebd891b..23005f7 100644 --- a/pkg/gocache/cache.go +++ b/pkg/gocache/cache.go @@ -21,7 +21,7 @@ func (c *Cache[K, V]) HasKey(k K) (bool, error) { } func (c *Cache[K, V]) Get(k K, getter func() (*V, error)) (*V, error) { - valpromise, alreadyExists, _ := c.cache.GetOrCreate(k, NewPromise[V]()) + valpromise, alreadyExists, _ := c.cache.getOrCreate(k, NewPromise[V]()) if alreadyExists { val, err := valpromise.Wait() if err != nil { diff --git a/pkg/gocache/partitioned.go b/pkg/gocache/partitioned.go index 09f360c..b4bd768 100644 --- a/pkg/gocache/partitioned.go +++ b/pkg/gocache/partitioned.go @@ -134,7 +134,7 @@ func (c *partitionedCache[K, V]) putUnsafe(key K, value *V) { c.lru[partition].push(e) } -func (c *partitionedCache[K, V]) GetOrCreate(key K, value *V) (*V, bool, *Promise[bool]) { +func (c *partitionedCache[K, V]) getOrCreate(key K, value *V) (*V, bool, *Promise[bool]) { p := NewPromise[bool]() // first, try to read the value by acquiring the read lock only c.rUpgradeableLock(key) diff --git a/pkg/gocache/partitioned_test.go b/pkg/gocache/partitioned_test.go index 97a9574..8dc4ee1 100644 --- a/pkg/gocache/partitioned_test.go +++ b/pkg/gocache/partitioned_test.go @@ -29,7 +29,7 @@ func TestPartitionedCache(t *testing.T) { num := 6 cache := newPartitionedCached[string, int](sizeOfPartition, numPartitions, 1*time.Hour) - _, exists, p := cache.GetOrCreate("A", &num) + _, exists, p := cache.getOrCreate("A", &num) assert.False(t, exists) p.Wait() @@ -37,7 +37,7 @@ func TestPartitionedCache(t *testing.T) { exists = cache.HasKey("A") assert.True(t, exists) - val, exists, _ := cache.GetOrCreate("A", &num) + val, exists, _ := cache.getOrCreate("A", &num) assert.True(t, exists) assert.Equal(t, 6, *val) } @@ -49,7 +49,7 @@ func TestPartitionedCacheEviction(t *testing.T) { cache := newPartitionedCached[string, int](sizeOfPartition, numPartitions, time.Hour) for i := 0; i < 100000; i++ { key := randString(16) - _, existsAlready, p := cache.GetOrCreate(key, nil) + _, existsAlready, p := cache.getOrCreate(key, nil) assert.False(t, existsAlready) p.Wait() assert.LessOrEqual(t, cache.PartitionLen(key), sizeOfPartition) @@ -61,13 +61,13 @@ func TestPartitionedCacheTTL(t *testing.T) { numPartitions := 4 cache := newPartitionedCached[string, int](sizeOfPartition, numPartitions, time.Microsecond) - _, existsAlready, p := cache.GetOrCreate("A", nil) + _, existsAlready, p := cache.getOrCreate("A", nil) assert.False(t, existsAlready) p.Wait() time.Sleep(time.Millisecond) - _, existsAlready, _ = cache.GetOrCreate("A", nil) + _, existsAlready, _ = cache.getOrCreate("A", nil) assert.False(t, existsAlready) } @@ -85,7 +85,7 @@ func TestPartitionedCacheAsyncTest(t *testing.T) { for j := 0; j < numActions; j++ { v := j k := randLetter() - _, _, p := cache.GetOrCreate(k, &v) + _, _, p := cache.getOrCreate(k, &v) p.Wait() assert.GreaterOrEqual(t, cache.PartitionLen(k), 1) assert.LessOrEqual(t, cache.PartitionLen(k), sizeOfPartition)