Skip to content

Commit

Permalink
refactor: move load and store logic to proc
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Sep 27, 2024
1 parent a044aad commit da54499
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 265 deletions.
2 changes: 1 addition & 1 deletion ext/pkg/network/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (n *WebSocketUpgradeNode) upgrade(proc *process.Process, inPck *packet.Pack
return nil, err
}

w, ok := proc.Data().LoadAndDelete(KeyHTTPResponseWriter).(http.ResponseWriter)
w, ok := proc.LoadAndDelete(KeyHTTPResponseWriter).(http.ResponseWriter)
if !ok {
return nil, encoding.ErrUnsupportedValue
}
Expand Down
6 changes: 3 additions & 3 deletions ext/pkg/network/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
proc.Exit(ctx.Err())
}()

proc.Data().Store(KeyHTTPResponseWriter, w)
proc.Data().Store(KeyHTTPRequest, r)
proc.Store(KeyHTTPResponseWriter, w)
proc.Store(KeyHTTPRequest, r)

outWriter := n.outPort.Open(proc)
errWriter := n.errPort.Open(proc)
Expand Down Expand Up @@ -233,7 +233,7 @@ func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = errors.New(http.StatusText(res.Status))
}

if w, ok := proc.Data().LoadAndDelete(KeyHTTPResponseWriter).(http.ResponseWriter); ok {
if w, ok := proc.LoadAndDelete(KeyHTTPResponseWriter).(http.ResponseWriter); ok {
n.negotiate(req, res)
_ = n.write(w, res)
}
Expand Down
88 changes: 0 additions & 88 deletions pkg/process/data.go

This file was deleted.

98 changes: 0 additions & 98 deletions pkg/process/data_test.go

This file was deleted.

57 changes: 33 additions & 24 deletions pkg/process/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,38 @@ package process

import "sync"

// Local is a concurrent cache for storing data associated with processes.
// Local provides a concurrent cache for storing data associated with processes.
type Local[T any] struct {
data map[*Process]T
watches map[*Process][]func(T)
mu sync.RWMutex
}

// NewLocal creates a new Local cache instance.
// NewLocal creates and returns a new Local cache instance.
func NewLocal[T any]() *Local[T] {
return &Local[T]{
data: make(map[*Process]T),
watches: make(map[*Process][]func(T)),
}
}

// Watch adds a watcher function for a process.
// If the process already has a value, the watcher is called immediately.
func (l *Local[T]) Watch(proc *Process, watch func(T)) bool {
// Watch adds a watcher for the specified process.
func (l *Local[T]) Watch(proc *Process, watch func(T)) {
l.mu.Lock()

val, ok := l.data[proc]
if !ok {
v, exists := l.data[proc]
if !exists {
l.watches[proc] = append(l.watches[proc], watch)
}

l.mu.Unlock()

if ok {
watch(val)
if exists {
watch(v)
}
return ok
}

// Keys returns all processes in the Local cache.
// Keys returns a slice of all processes in the cache.
func (l *Local[T]) Keys() []*Process {
l.mu.RLock()
defer l.mu.RUnlock()
Expand All @@ -47,23 +45,23 @@ func (l *Local[T]) Keys() []*Process {
return keys
}

// Load retrieves the value for a given process.
// Load retrieves the value associated with the specified process.
func (l *Local[T]) Load(proc *Process) (T, bool) {
l.mu.RLock()
defer l.mu.RUnlock()

val, ok := l.data[proc]
return val, ok
val, exists := l.data[proc]
return val, exists
}

// Store stores a value for a process and sets an exit hook if new.
// Store associates a value with the specified process.
func (l *Local[T]) Store(proc *Process, val T) {
l.mu.Lock()

_, ok := l.data[proc]
l.data[proc] = val
_, exists := l.data[proc]

if !ok {
l.data[proc] = val
if !exists {
proc.AddExitHook(ExitFunc(func(err error) {
l.Delete(proc)
}))
Expand All @@ -79,25 +77,25 @@ func (l *Local[T]) Store(proc *Process, val T) {
}
}

// Delete removes a process and its value from the Local cache.
// Delete removes the specified process and its associated value from the cache.
func (l *Local[T]) Delete(proc *Process) bool {
l.mu.Lock()
defer l.mu.Unlock()

_, ok := l.data[proc]
_, exists := l.data[proc]

delete(l.data, proc)
delete(l.watches, proc)

return ok
return exists
}

// LoadOrStore retrieves the value for a process or stores a new value if not present.
// LoadOrStore retrieves the value for the specified process or stores a new value if absent.
func (l *Local[T]) LoadOrStore(proc *Process, val func() (T, error)) (T, error) {
l.mu.Lock()
defer l.mu.Unlock()

if v, ok := l.data[proc]; ok {
if v, exists := l.data[proc]; exists {
return v, nil
}

Expand All @@ -111,10 +109,21 @@ func (l *Local[T]) LoadOrStore(proc *Process, val func() (T, error)) (T, error)
l.Delete(proc)
}))

watches := l.watches[proc]
delete(l.watches, proc)

l.mu.Unlock()

for _, watch := range watches {
watch(v)
}

l.mu.Lock()

return v, nil
}

// Close clears the Local cache.
// Close clears the entire cache, removing all processes and their values.
func (l *Local[T]) Close() {
l.mu.Lock()
defer l.mu.Unlock()
Expand Down
6 changes: 2 additions & 4 deletions pkg/process/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,18 @@ func TestLocal_Watch(t *testing.T) {
defer proc.Exit(nil)

count := 0
ok := l.Watch(proc, func(_ string) {
l.Watch(proc, func(_ string) {
count++
})
assert.False(t, ok)

v := faker.UUIDHyphenated()

l.Store(proc, v)
assert.Equal(t, 1, count)

ok = l.Watch(proc, func(_ string) {
l.Watch(proc, func(_ string) {
count++
})
assert.True(t, ok)
assert.Equal(t, 2, count)
}

Expand Down
Loading

0 comments on commit da54499

Please sign in to comment.