Skip to content

Commit

Permalink
platform/6914: Replacing shared read/write lock with an upgradeable, …
Browse files Browse the repository at this point in the history
…distinct read and write locking mechanism
  • Loading branch information
Ghnuberath committed Jun 13, 2024
1 parent 1804b69 commit 4e304c6
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 15 deletions.
44 changes: 29 additions & 15 deletions pkg/gocache/partitioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package gocache
import (
"fmt"
"hash/fnv"
"sync"
"time"
)

Expand All @@ -22,7 +21,7 @@ type partitionedCache[K comparable, V any] struct {
sizePerPartition int
lru []*lruQueue[K, V]
maps []map[K]*entry[K, V]
mu []*sync.Mutex
mu []*UpgradableRWMutex
ttl time.Duration
}

Expand All @@ -45,14 +44,14 @@ func newPartitionedCached[K comparable, V any](sizePerPartition int, numPartitio
sizePerPartition: sizePerPartition,
lru: make([]*lruQueue[K, V], numPartitions),
maps: make([]map[K]*entry[K, V], numPartitions),
mu: make([]*sync.Mutex, numPartitions),
mu: make([]*UpgradableRWMutex, numPartitions),
ttl: ttl,
}

for i := 0; i < numPartitions; i++ {
cache.lru[i] = newLRUQueue[K, V]()
cache.maps[i] = map[K]*entry[K, V]{}
cache.mu[i] = &sync.Mutex{}
cache.mu[i] = &UpgradableRWMutex{}
}

return cache
Expand All @@ -62,12 +61,24 @@ func (c *partitionedCache[K, V]) partition(key K) uint32 {
return hash(fmt.Sprintf("%v", key)) % uint32(c.numPartitions)
}

func (c *partitionedCache[K, V]) lock(key K) {
c.mu[c.partition(key)].Lock()
func (c *partitionedCache[K, V]) rLock(key K) {
c.mu[c.partition(key)].RLock()
}

func (c *partitionedCache[K, V]) unlock(key K) {
c.mu[c.partition(key)].Unlock()
func (c *partitionedCache[K, V]) rUnlock(key K) {
c.mu[c.partition(key)].RUnlock()
}

func (c *partitionedCache[K, V]) rUpgradeableLock(key K) {
c.mu[c.partition(key)].UpgradableRLock()
}

func (c *partitionedCache[K, V]) rUpgradeableUnlock(key K) {
c.mu[c.partition(key)].UpgradableRUnlock()
}

func (c *partitionedCache[K, V]) wUpgradeLock(key K) {
c.mu[c.partition(key)].UpgradeWLock()
}

func (c *partitionedCache[K, V]) getUnsafe(key K) (*V, bool) {
Expand Down Expand Up @@ -124,28 +135,31 @@ func (c *partitionedCache[K, V]) putUnsafe(key K, value *V) {
}

func (c *partitionedCache[K, V]) GetOrCreate(key K, value *V) (*V, bool) {
c.lock(key)
// first, try to read the value by acquiring the read lock only
c.rUpgradeableLock(key)
defer c.rUpgradeableUnlock(key)
v, ok := c.getUnsafe(key)
if ok {
c.unlock(key)
return v, true
}

// if the value was not present, we need to store it instead,
// so upgrade to a write lock
c.wUpgradeLock(key)
c.putUnsafe(key, value)
c.unlock(key)
return value, false
}

func (c *partitionedCache[K, V]) HasKey(key K) bool {
c.lock(key)
defer c.unlock(key)
c.rLock(key)
defer c.rUnlock(key)
_, ok := c.getUnsafe(key)
return ok
}

func (c *partitionedCache[K, V]) PartitionLen(key K) int {
c.lock(key)
defer c.unlock(key)
c.rLock(key)
defer c.rUnlock(key)

return len(c.maps[c.partition(key)])
}
199 changes: 199 additions & 0 deletions pkg/gocache/upgradeable_rw_mutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Based on source code copyright by The Go Authors.
// Copied from https://gist.github.com/sancar/d1663e90892cd12c839ae21841b79295
//
// Copyright (c) 2009 The Go Authors. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

package gocache

import (
"sync"
"sync/atomic"
_ "unsafe"
)

// UpgradableRWMutex is an enhanced version of the standard sync.RWMutex.
// It has the all methods sync.RWMutex with exact same semantics.
// It gives more methods to give upgradable-read feature.
//
// The new semantics for upgradable-read are as follows:
// Multiple goroutines can get read-lock together with a single upgradable-read-lock.
// Only one goroutine can have a write-lock and no read-lock/upgradable-read-lock can be acquired in this state.
// There can be only a single goroutine keeping the upgrade-read-lock.
// UpgradableRWMutex is not reentrant.
//
// Usage of the UpgradableRWMutex:
//
// mutex.UpgradableRLock()
// defer mutex.UpgradableRUnlock()
// // read-lock acquired section. We can return here safely if an error occurs
// mutex.UpgradeWLock()
// // critical section with exclusive right access
type UpgradableRWMutex struct {
w sync.Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers

Check failure on line 61 in pkg/gocache/upgradeable_rw_mutex.go

View workflow job for this annotation

GitHub Actions / build

undefined: atomic.Int32
// number of departing readers. A negative number.
// Number of readers left while under write lock(while write lock waiting for readers to leave)
readerWait atomic.Int32

Check failure on line 64 in pkg/gocache/upgradeable_rw_mutex.go

View workflow job for this annotation

GitHub Actions / build

undefined: atomic.Int32
// Keep track if an upgradeable read-lock is upgraded to write-lock or not. Always accessed under w locked
upgraded bool
upgradableReadMode bool
}

//go:linkname semaphoreAcquire sync.runtime_Semacquire
func semaphoreAcquire(s *uint32)

//go:linkname semaphoreRelease sync.runtime_Semrelease
func semaphoreRelease(s *uint32, handoff bool, skipframes int)

const rwmutexMaxReaders = 1 << 30

// RLock is same as sync.RWMutex.RLock
func (rw *UpgradableRWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
// A writer is pending, wait for it.
semaphoreAcquire(&rw.readerSem)
}
}

// TryRLock is same as sync.RWMutex
func (rw *UpgradableRWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
if c < 0 {
return false
}
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}

// RUnlock is same as sync.RWMutex
func (rw *UpgradableRWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}

func (rw *UpgradableRWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
panic("sync: RUnlock of unlocked UpgradableRWMutex")
}
// A writer is pending.
if rw.readerWait.Add(-1) == 0 {
// The last reader unblocks the writer.
semaphoreRelease(&rw.writerSem, false, 1)
}
}

// Lock is same as sync.RWMutex
func (rw *UpgradableRWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && rw.readerWait.Add(r) != 0 {
semaphoreAcquire(&rw.writerSem)
}
}

// TryLock is same as sync.RWMutex
func (rw *UpgradableRWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}

// Unlock is same as sync.RWMutex
func (rw *UpgradableRWMutex) Unlock() {
// Announce to readers there is no active writer.
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
panic("sync: Unlock of unlocked UpgradableRWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
semaphoreRelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}

// UpgradeWLock upgrade the read lock to the write lock
func (rw *UpgradableRWMutex) UpgradeWLock() {
if !rw.upgradableReadMode {
panic("sync: Upgrade outside of upgradableReadLock not allowed")
}
rw.upgraded = true
// Announce to readers there is a pending writer.
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && rw.readerWait.Add(r) != 0 {
semaphoreAcquire(&rw.writerSem)
}
}

// UpgradableRUnlock unlocks either the write-lock if it is upgraded
// or unlock just the upgradeableRead-lock if not upgraded
func (rw *UpgradableRWMutex) UpgradableRUnlock() {
rw.upgradableReadMode = false
if rw.upgraded {
rw.upgraded = false
rw.Unlock()
} else {
rw.w.Unlock()
}
}

// UpgradableRLock acquires an upgradable-read-lock which can be later upgraded to write-lock,
// Example usage:
//
// mutex.UpgradableRLock()
// defer mutex.UpgradableRUnlock()
// // read-lock acquired section. We can return here safely if an error occurs
// mutex.UpgradeWLock()
// // critical section with exclusive right access
func (rw *UpgradableRWMutex) UpgradableRLock() {
// First, resolve competition with other writers.
// Disallow writers to acquire the lock
rw.w.Lock()
if rw.readerCount.Load() < 0 {
panic("reader count can not be negative. We have the write lock")
}
rw.upgradableReadMode = true
}

0 comments on commit 4e304c6

Please sign in to comment.