From 4d85a84e789e4131396f1965bfd745c291fb7065 Mon Sep 17 00:00:00 2001 From: Sean McIntyre Date: Thu, 13 Jun 2024 14:11:56 -0400 Subject: [PATCH] platform/6914: Replacing shared read/write lock with an upgradeable, distinct read and write locking mechanism (#10) * platform/6914: Replacing shared read/write lock with an upgradeable, distinct read and write locking mechanism * Bumping go version for atomic package --- .github/workflows/ci.yml | 2 +- .go-version | 2 +- go.mod | 2 +- pkg/gocache/partitioned.go | 44 +++--- pkg/gocache/upgradeable_rw_mutex.go | 199 ++++++++++++++++++++++++++++ 5 files changed, 231 insertions(+), 18 deletions(-) create mode 100644 pkg/gocache/upgradeable_rw_mutex.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f7462f8..c6d6234 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,7 +14,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: 1.18 + go-version: 1.21 - name: Build run: go build -v ./... diff --git a/.go-version b/.go-version index b9fb27a..d2ab029 100644 --- a/.go-version +++ b/.go-version @@ -1 +1 @@ -1.18.3 +1.21 diff --git a/go.mod b/go.mod index d465d5e..bc86237 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/thinkdata-works/gocache -go 1.18 +go 1.21 require github.com/stretchr/testify v1.8.3 diff --git a/pkg/gocache/partitioned.go b/pkg/gocache/partitioned.go index fa4c5d2..62d3ffa 100644 --- a/pkg/gocache/partitioned.go +++ b/pkg/gocache/partitioned.go @@ -3,7 +3,6 @@ package gocache import ( "fmt" "hash/fnv" - "sync" "time" ) @@ -22,7 +21,7 @@ type partitionedCache[K comparable, V any] struct { sizePerPartition int lru []*lruQueue[K, V] maps []map[K]*entry[K, V] - mu []*sync.Mutex + mu []*UpgradableRWMutex ttl time.Duration } @@ -45,14 +44,14 @@ func newPartitionedCached[K comparable, V any](sizePerPartition int, numPartitio sizePerPartition: sizePerPartition, lru: make([]*lruQueue[K, V], numPartitions), maps: make([]map[K]*entry[K, V], numPartitions), - mu: make([]*sync.Mutex, numPartitions), + mu: make([]*UpgradableRWMutex, numPartitions), ttl: ttl, } for i := 0; i < numPartitions; i++ { cache.lru[i] = newLRUQueue[K, V]() cache.maps[i] = map[K]*entry[K, V]{} - cache.mu[i] = &sync.Mutex{} + cache.mu[i] = &UpgradableRWMutex{} } return cache @@ -62,12 +61,24 @@ func (c *partitionedCache[K, V]) partition(key K) uint32 { return hash(fmt.Sprintf("%v", key)) % uint32(c.numPartitions) } -func (c *partitionedCache[K, V]) lock(key K) { - c.mu[c.partition(key)].Lock() +func (c *partitionedCache[K, V]) rLock(key K) { + c.mu[c.partition(key)].RLock() } -func (c *partitionedCache[K, V]) unlock(key K) { - c.mu[c.partition(key)].Unlock() +func (c *partitionedCache[K, V]) rUnlock(key K) { + c.mu[c.partition(key)].RUnlock() +} + +func (c *partitionedCache[K, V]) rUpgradeableLock(key K) { + c.mu[c.partition(key)].UpgradableRLock() +} + +func (c *partitionedCache[K, V]) rUpgradeableUnlock(key K) { + c.mu[c.partition(key)].UpgradableRUnlock() +} + +func (c *partitionedCache[K, V]) wUpgradeLock(key K) { + c.mu[c.partition(key)].UpgradeWLock() } func (c *partitionedCache[K, V]) getUnsafe(key K) (*V, bool) { @@ -124,28 +135,31 @@ func (c *partitionedCache[K, V]) putUnsafe(key K, value *V) { } func (c *partitionedCache[K, V]) GetOrCreate(key K, value *V) (*V, bool) { - c.lock(key) + // first, try to read the value by acquiring the read lock only + c.rUpgradeableLock(key) + defer c.rUpgradeableUnlock(key) v, ok := c.getUnsafe(key) if ok { - c.unlock(key) return v, true } + // if the value was not present, we need to store it instead, + // so upgrade to a write lock + c.wUpgradeLock(key) c.putUnsafe(key, value) - c.unlock(key) return value, false } func (c *partitionedCache[K, V]) HasKey(key K) bool { - c.lock(key) - defer c.unlock(key) + c.rLock(key) + defer c.rUnlock(key) _, ok := c.getUnsafe(key) return ok } func (c *partitionedCache[K, V]) PartitionLen(key K) int { - c.lock(key) - defer c.unlock(key) + c.rLock(key) + defer c.rUnlock(key) return len(c.maps[c.partition(key)]) } diff --git a/pkg/gocache/upgradeable_rw_mutex.go b/pkg/gocache/upgradeable_rw_mutex.go new file mode 100644 index 0000000..3167569 --- /dev/null +++ b/pkg/gocache/upgradeable_rw_mutex.go @@ -0,0 +1,199 @@ +// Based on source code copyright by The Go Authors. +// Copied from https://gist.github.com/sancar/d1663e90892cd12c839ae21841b79295 +// +// Copyright (c) 2009 The Go Authors. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package gocache + +import ( + "sync" + "sync/atomic" + _ "unsafe" +) + +// UpgradableRWMutex is an enhanced version of the standard sync.RWMutex. +// It has the all methods sync.RWMutex with exact same semantics. +// It gives more methods to give upgradable-read feature. +// +// The new semantics for upgradable-read are as follows: +// Multiple goroutines can get read-lock together with a single upgradable-read-lock. +// Only one goroutine can have a write-lock and no read-lock/upgradable-read-lock can be acquired in this state. +// There can be only a single goroutine keeping the upgrade-read-lock. +// UpgradableRWMutex is not reentrant. +// +// Usage of the UpgradableRWMutex: +// +// mutex.UpgradableRLock() +// defer mutex.UpgradableRUnlock() +// // read-lock acquired section. We can return here safely if an error occurs +// mutex.UpgradeWLock() +// // critical section with exclusive right access +type UpgradableRWMutex struct { + w sync.Mutex // held if there are pending writers + writerSem uint32 // semaphore for writers to wait for completing readers + readerSem uint32 // semaphore for readers to wait for completing writers + readerCount atomic.Int32 // number of pending readers + // number of departing readers. A negative number. + // Number of readers left while under write lock(while write lock waiting for readers to leave) + readerWait atomic.Int32 + // Keep track if an upgradeable read-lock is upgraded to write-lock or not. Always accessed under w locked + upgraded bool + upgradableReadMode bool +} + +//go:linkname semaphoreAcquire sync.runtime_Semacquire +func semaphoreAcquire(s *uint32) + +//go:linkname semaphoreRelease sync.runtime_Semrelease +func semaphoreRelease(s *uint32, handoff bool, skipframes int) + +const rwmutexMaxReaders = 1 << 30 + +// RLock is same as sync.RWMutex.RLock +func (rw *UpgradableRWMutex) RLock() { + if rw.readerCount.Add(1) < 0 { + // A writer is pending, wait for it. + semaphoreAcquire(&rw.readerSem) + } +} + +// TryRLock is same as sync.RWMutex +func (rw *UpgradableRWMutex) TryRLock() bool { + for { + c := rw.readerCount.Load() + if c < 0 { + return false + } + if rw.readerCount.CompareAndSwap(c, c+1) { + return true + } + } +} + +// RUnlock is same as sync.RWMutex +func (rw *UpgradableRWMutex) RUnlock() { + if r := rw.readerCount.Add(-1); r < 0 { + // Outlined slow-path to allow the fast-path to be inlined + rw.rUnlockSlow(r) + } +} + +func (rw *UpgradableRWMutex) rUnlockSlow(r int32) { + if r+1 == 0 || r+1 == -rwmutexMaxReaders { + panic("sync: RUnlock of unlocked UpgradableRWMutex") + } + // A writer is pending. + if rw.readerWait.Add(-1) == 0 { + // The last reader unblocks the writer. + semaphoreRelease(&rw.writerSem, false, 1) + } +} + +// Lock is same as sync.RWMutex +func (rw *UpgradableRWMutex) Lock() { + // First, resolve competition with other writers. + rw.w.Lock() + // Announce to readers there is a pending writer. + r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders + // Wait for active readers. + if r != 0 && rw.readerWait.Add(r) != 0 { + semaphoreAcquire(&rw.writerSem) + } +} + +// TryLock is same as sync.RWMutex +func (rw *UpgradableRWMutex) TryLock() bool { + if !rw.w.TryLock() { + return false + } + if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) { + rw.w.Unlock() + return false + } + return true +} + +// Unlock is same as sync.RWMutex +func (rw *UpgradableRWMutex) Unlock() { + // Announce to readers there is no active writer. + r := rw.readerCount.Add(rwmutexMaxReaders) + if r >= rwmutexMaxReaders { + panic("sync: Unlock of unlocked UpgradableRWMutex") + } + // Unblock blocked readers, if any. + for i := 0; i < int(r); i++ { + semaphoreRelease(&rw.readerSem, false, 0) + } + // Allow other writers to proceed. + rw.w.Unlock() +} + +// UpgradeWLock upgrade the read lock to the write lock +func (rw *UpgradableRWMutex) UpgradeWLock() { + if !rw.upgradableReadMode { + panic("sync: Upgrade outside of upgradableReadLock not allowed") + } + rw.upgraded = true + // Announce to readers there is a pending writer. + r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders + // Wait for active readers. + if r != 0 && rw.readerWait.Add(r) != 0 { + semaphoreAcquire(&rw.writerSem) + } +} + +// UpgradableRUnlock unlocks either the write-lock if it is upgraded +// or unlock just the upgradeableRead-lock if not upgraded +func (rw *UpgradableRWMutex) UpgradableRUnlock() { + rw.upgradableReadMode = false + if rw.upgraded { + rw.upgraded = false + rw.Unlock() + } else { + rw.w.Unlock() + } +} + +// UpgradableRLock acquires an upgradable-read-lock which can be later upgraded to write-lock, +// Example usage: +// +// mutex.UpgradableRLock() +// defer mutex.UpgradableRUnlock() +// // read-lock acquired section. We can return here safely if an error occurs +// mutex.UpgradeWLock() +// // critical section with exclusive right access +func (rw *UpgradableRWMutex) UpgradableRLock() { + // First, resolve competition with other writers. + // Disallow writers to acquire the lock + rw.w.Lock() + if rw.readerCount.Load() < 0 { + panic("reader count can not be negative. We have the write lock") + } + rw.upgradableReadMode = true +}