From da544990846e9290d13937123b6d9e01f5058de1 Mon Sep 17 00:00:00 2001 From: siyul-park Date: Fri, 27 Sep 2024 10:55:36 -0400 Subject: [PATCH] refactor: move load and store logic to proc --- ext/pkg/network/gateway.go | 2 +- ext/pkg/network/listener.go | 6 +-- pkg/process/data.go | 88 --------------------------------- pkg/process/data_test.go | 98 ------------------------------------- pkg/process/local.go | 57 ++++++++++++--------- pkg/process/local_test.go | 6 +-- pkg/process/process.go | 93 ++++++++++++++++++++++++----------- pkg/process/process_test.go | 65 +++++++++++++++++++++++- pkg/runtime/runtime.go | 2 +- pkg/symbol/loader_test.go | 10 ++-- pkg/symbol/table.go | 4 +- pkg/symbol/table_test.go | 20 ++++---- 12 files changed, 186 insertions(+), 265 deletions(-) delete mode 100644 pkg/process/data.go delete mode 100644 pkg/process/data_test.go diff --git a/ext/pkg/network/gateway.go b/ext/pkg/network/gateway.go index 4fa5da9d..cac01b7b 100644 --- a/ext/pkg/network/gateway.go +++ b/ext/pkg/network/gateway.go @@ -115,7 +115,7 @@ func (n *WebSocketUpgradeNode) upgrade(proc *process.Process, inPck *packet.Pack return nil, err } - w, ok := proc.Data().LoadAndDelete(KeyHTTPResponseWriter).(http.ResponseWriter) + w, ok := proc.LoadAndDelete(KeyHTTPResponseWriter).(http.ResponseWriter) if !ok { return nil, encoding.ErrUnsupportedValue } diff --git a/ext/pkg/network/listener.go b/ext/pkg/network/listener.go index 4a27b87f..9df66dd4 100644 --- a/ext/pkg/network/listener.go +++ b/ext/pkg/network/listener.go @@ -195,8 +195,8 @@ func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { proc.Exit(ctx.Err()) }() - proc.Data().Store(KeyHTTPResponseWriter, w) - proc.Data().Store(KeyHTTPRequest, r) + proc.Store(KeyHTTPResponseWriter, w) + proc.Store(KeyHTTPRequest, r) outWriter := n.outPort.Open(proc) errWriter := n.errPort.Open(proc) @@ -233,7 +233,7 @@ func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { err = errors.New(http.StatusText(res.Status)) } - if w, ok := proc.Data().LoadAndDelete(KeyHTTPResponseWriter).(http.ResponseWriter); ok { + if w, ok := proc.LoadAndDelete(KeyHTTPResponseWriter).(http.ResponseWriter); ok { n.negotiate(req, res) _ = n.write(w, res) } diff --git a/pkg/process/data.go b/pkg/process/data.go deleted file mode 100644 index 574aacda..00000000 --- a/pkg/process/data.go +++ /dev/null @@ -1,88 +0,0 @@ -package process - -import "sync" - -// Data represents a hierarchical data structure that supports concurrent access. -type Data struct { - outer *Data - data map[string]any - mu sync.RWMutex -} - -// newData creates a new Data instance. -func newData() *Data { - return &Data{ - data: make(map[string]any), - } -} - -// LoadAndDelete retrieves and deletes the value for the given key. -// If not found, it checks the outer Data instance. -func (d *Data) LoadAndDelete(key string) any { - d.mu.Lock() - defer d.mu.Unlock() - - if val, ok := d.data[key]; ok { - delete(d.data, key) - return val - } - - if d.outer == nil { - return nil - } - return d.outer.LoadAndDelete(key) -} - -// Load retrieves the value for the given key. -// If not found, it checks the outer Data instance. -func (d *Data) Load(key string) any { - d.mu.RLock() - defer d.mu.RUnlock() - - if val, ok := d.data[key]; ok { - return val - } - - if d.outer == nil { - return nil - } - return d.outer.Load(key) -} - -// Store stores the value under the given key. -func (d *Data) Store(key string, val any) { - d.mu.Lock() - defer d.mu.Unlock() - - d.data[key] = val -} - -// Delete removes the value for the given key. -// Returns true if the key existed and was deleted. -func (d *Data) Delete(key string) bool { - d.mu.Lock() - defer d.mu.Unlock() - - if _, ok := d.data[key]; ok { - delete(d.data, key) - return true - } - return false -} - -// Fork creates a new Data instance that inherits from the current instance. -func (d *Data) Fork() *Data { - return &Data{ - data: make(map[string]any), - outer: d, - } -} - -// Close clears the data and removes the reference to the outer instance. -func (d *Data) Close() { - d.mu.Lock() - defer d.mu.Unlock() - - d.data = make(map[string]any) - d.outer = nil -} diff --git a/pkg/process/data_test.go b/pkg/process/data_test.go deleted file mode 100644 index d426e3fd..00000000 --- a/pkg/process/data_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package process - -import ( - "testing" - - "github.com/go-faker/faker/v4" - "github.com/stretchr/testify/assert" -) - -func TestData_LoadAndDelete(t *testing.T) { - d := newData() - defer d.Close() - - k := faker.UUIDHyphenated() - v := faker.UUIDHyphenated() - - d.Store(k, v) - - r := d.LoadAndDelete(k) - assert.Equal(t, v, r) - - r = d.Load(k) - assert.Equal(t, nil, r) -} - -func TestData_Load(t *testing.T) { - d := newData() - defer d.Close() - - k := faker.UUIDHyphenated() - v := faker.UUIDHyphenated() - - r := d.Load(k) - assert.Equal(t, nil, r) - - d.Store(k, v) - - r = d.Load(k) - assert.Equal(t, v, r) -} - -func TestData_Store(t *testing.T) { - d := newData() - defer d.Close() - - k := faker.UUIDHyphenated() - v1 := faker.UUIDHyphenated() - v2 := faker.UUIDHyphenated() - - d.Store(k, v1) - d.Store(k, v2) - - r := d.Load(k) - assert.Equal(t, v2, r) -} - -func TestData_Delete(t *testing.T) { - d := newData() - defer d.Close() - - k := faker.UUIDHyphenated() - v := faker.UUIDHyphenated() - - ok := d.Delete(k) - assert.False(t, ok) - - d.Store(k, v) - - ok = d.Delete(k) - assert.True(t, ok) -} - -func TestData_Fork(t *testing.T) { - d := newData() - defer d.Close() - - c := d.Fork() - - k := faker.UUIDHyphenated() - v1 := faker.UUIDHyphenated() - v2 := faker.UUIDHyphenated() - - d.Store(k, v1) - - r := c.Load(k) - assert.Equal(t, v1, r) - - c.Store(k, v2) - - r = c.Load(k) - assert.Equal(t, v2, r) - - ok := c.Delete(k) - assert.True(t, ok) - - ok = d.Delete(k) - assert.True(t, ok) -} diff --git a/pkg/process/local.go b/pkg/process/local.go index 5238089d..284ddd23 100644 --- a/pkg/process/local.go +++ b/pkg/process/local.go @@ -2,14 +2,14 @@ package process import "sync" -// Local is a concurrent cache for storing data associated with processes. +// Local provides a concurrent cache for storing data associated with processes. type Local[T any] struct { data map[*Process]T watches map[*Process][]func(T) mu sync.RWMutex } -// NewLocal creates a new Local cache instance. +// NewLocal creates and returns a new Local cache instance. func NewLocal[T any]() *Local[T] { return &Local[T]{ data: make(map[*Process]T), @@ -17,25 +17,23 @@ func NewLocal[T any]() *Local[T] { } } -// Watch adds a watcher function for a process. -// If the process already has a value, the watcher is called immediately. -func (l *Local[T]) Watch(proc *Process, watch func(T)) bool { +// Watch adds a watcher for the specified process. +func (l *Local[T]) Watch(proc *Process, watch func(T)) { l.mu.Lock() - val, ok := l.data[proc] - if !ok { + v, exists := l.data[proc] + if !exists { l.watches[proc] = append(l.watches[proc], watch) } l.mu.Unlock() - if ok { - watch(val) + if exists { + watch(v) } - return ok } -// Keys returns all processes in the Local cache. +// Keys returns a slice of all processes in the cache. func (l *Local[T]) Keys() []*Process { l.mu.RLock() defer l.mu.RUnlock() @@ -47,23 +45,23 @@ func (l *Local[T]) Keys() []*Process { return keys } -// Load retrieves the value for a given process. +// Load retrieves the value associated with the specified process. func (l *Local[T]) Load(proc *Process) (T, bool) { l.mu.RLock() defer l.mu.RUnlock() - val, ok := l.data[proc] - return val, ok + val, exists := l.data[proc] + return val, exists } -// Store stores a value for a process and sets an exit hook if new. +// Store associates a value with the specified process. func (l *Local[T]) Store(proc *Process, val T) { l.mu.Lock() - _, ok := l.data[proc] - l.data[proc] = val + _, exists := l.data[proc] - if !ok { + l.data[proc] = val + if !exists { proc.AddExitHook(ExitFunc(func(err error) { l.Delete(proc) })) @@ -79,25 +77,25 @@ func (l *Local[T]) Store(proc *Process, val T) { } } -// Delete removes a process and its value from the Local cache. +// Delete removes the specified process and its associated value from the cache. func (l *Local[T]) Delete(proc *Process) bool { l.mu.Lock() defer l.mu.Unlock() - _, ok := l.data[proc] + _, exists := l.data[proc] delete(l.data, proc) delete(l.watches, proc) - return ok + return exists } -// LoadOrStore retrieves the value for a process or stores a new value if not present. +// LoadOrStore retrieves the value for the specified process or stores a new value if absent. func (l *Local[T]) LoadOrStore(proc *Process, val func() (T, error)) (T, error) { l.mu.Lock() defer l.mu.Unlock() - if v, ok := l.data[proc]; ok { + if v, exists := l.data[proc]; exists { return v, nil } @@ -111,10 +109,21 @@ func (l *Local[T]) LoadOrStore(proc *Process, val func() (T, error)) (T, error) l.Delete(proc) })) + watches := l.watches[proc] + delete(l.watches, proc) + + l.mu.Unlock() + + for _, watch := range watches { + watch(v) + } + + l.mu.Lock() + return v, nil } -// Close clears the Local cache. +// Close clears the entire cache, removing all processes and their values. func (l *Local[T]) Close() { l.mu.Lock() defer l.mu.Unlock() diff --git a/pkg/process/local_test.go b/pkg/process/local_test.go index 612ab7bc..5b349664 100644 --- a/pkg/process/local_test.go +++ b/pkg/process/local_test.go @@ -15,20 +15,18 @@ func TestLocal_Watch(t *testing.T) { defer proc.Exit(nil) count := 0 - ok := l.Watch(proc, func(_ string) { + l.Watch(proc, func(_ string) { count++ }) - assert.False(t, ok) v := faker.UUIDHyphenated() l.Store(proc, v) assert.Equal(t, 1, count) - ok = l.Watch(proc, func(_ string) { + l.Watch(proc, func(_ string) { count++ }) - assert.True(t, ok) assert.Equal(t, 2, count) } diff --git a/pkg/process/process.go b/pkg/process/process.go index 1a7b8010..decc9f64 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -11,7 +11,7 @@ import ( type Process struct { parent *Process id uuid.UUID - data *Data + data map[string]any status Status err error ctx context.Context @@ -35,40 +35,59 @@ func New() *Process { ctx, cancel := context.WithCancelCause(context.Background()) return &Process{ id: uuid.Must(uuid.NewV7()), - data: newData(), + data: make(map[string]any), ctx: ctx, exitHooks: []ExitHook{ExitFunc(cancel)}, } } -// AddExitHook adds an exit hook to run when the process terminates. -func (p *Process) AddExitHook(hook ExitHook) bool { +// ID returns the process's id. +func (p *Process) ID() uuid.UUID { + return p.id +} + +// Load retrieves the value for the given key. +func (p *Process) Load(key string) any { + p.mu.RLock() + defer p.mu.RUnlock() + + return p.data[key] +} + +// Store stores the value under the given key. +func (p *Process) Store(key string, val any) { p.mu.Lock() defer p.mu.Unlock() + + p.data[key] = val +} - if p.status == StatusTerminated { - go hook.Exit(p.err) - return false - } - - for _, h := range p.exitHooks { - if h == hook { - return false - } +// Delete removes the value for the given key. +func (p *Process) Delete(key string) bool { + p.mu.Lock() + defer p.mu.Unlock() + + if _, ok := p.data[key]; ok { + delete(p.data, key) + return true } - - p.exitHooks = append(p.exitHooks, hook) - return true + return false } -// ID returns the process's id. -func (p *Process) ID() uuid.UUID { - return p.id -} +// LoadAndDelete retrieves and deletes the value for the given key. +func (p *Process) LoadAndDelete(key string) any { + p.mu.Lock() + defer p.mu.Unlock() -// Data returns the process's data. -func (p *Process) Data() *Data { - return p.data + if val, ok := p.data[key]; ok { + delete(p.data, key) + return val + } + + if p.parent == nil { + return nil + } + return p.parent.LoadAndDelete(key) } // Status returns the current status of the process. @@ -109,7 +128,7 @@ func (p *Process) Fork() *Process { ctx, cancel := context.WithCancelCause(p.ctx) child := &Process{ id: uuid.Must(uuid.NewV7()), - data: p.data.Fork(), + data: make(map[string]any), // Initialize child data map ctx: ctx, parent: p, exitHooks: []ExitHook{ @@ -122,7 +141,7 @@ func (p *Process) Fork() *Process { return child } -// Exit terminates the process with the provided error, closes data resources, and runs exit hooks. +// Exit terminates the process with the provided error and runs exit hooks. func (p *Process) Exit(err error) { p.mu.Lock() @@ -132,9 +151,7 @@ func (p *Process) Exit(err error) { } exitHooks := p.exitHooks[:] - - p.data.Close() - + p.data = make(map[string]any) p.status = StatusTerminated p.err = err p.exitHooks = nil @@ -146,3 +163,23 @@ func (p *Process) Exit(err error) { h.Exit(err) } } + +// AddExitHook adds an exit hook to run when the process terminates. +func (p *Process) AddExitHook(hook ExitHook) bool { + p.mu.Lock() + defer p.mu.Unlock() + + if p.status == StatusTerminated { + go hook.Exit(p.err) + return false + } + + for _, h := range p.exitHooks { + if h == hook { + return false + } + } + + p.exitHooks = append(p.exitHooks, hook) + return true +} diff --git a/pkg/process/process_test.go b/pkg/process/process_test.go index cf569de2..3bc54142 100644 --- a/pkg/process/process_test.go +++ b/pkg/process/process_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/go-faker/faker/v4" "github.com/stretchr/testify/assert" ) @@ -13,12 +14,74 @@ func TestNewProcess(t *testing.T) { defer proc.Exit(nil) assert.NotZero(t, proc.ID()) - assert.NotNil(t, proc.Data()) assert.NotNil(t, proc.Context()) assert.Equal(t, nil, proc.Err()) assert.Equal(t, StatusRunning, proc.Status()) } +func TestProcess_Load(t *testing.T) { + proc := New() + defer proc.Exit(nil) + + k := faker.UUIDHyphenated() + v := faker.UUIDHyphenated() + + r := proc.Load(k) + assert.Nil(t, r) + + proc.Store(k, v) + + r = proc.Load(k) + assert.Equal(t, v, r) +} + +func TestProcess_Store(t *testing.T) { + proc := New() + defer proc.Exit(nil) + + k := faker.UUIDHyphenated() + v1 := faker.UUIDHyphenated() + v2 := faker.UUIDHyphenated() + + proc.Store(k, v1) + proc.Store(k, v2) + + r := proc.Load(k) + assert.Equal(t, v2, r) +} + +func TestProcess_Delete(t *testing.T) { + proc := New() + defer proc.Exit(nil) + + k := faker.UUIDHyphenated() + v := faker.UUIDHyphenated() + + ok := proc.Delete(k) + assert.False(t, ok) + + proc.Store(k, v) + + ok = proc.Delete(k) + assert.True(t, ok) +} + +func TestProcess_LoadAndDelete(t *testing.T) { + proc := New() + defer proc.Exit(nil) + + k := faker.UUIDHyphenated() + v := faker.UUIDHyphenated() + + proc.Store(k, v) + + r := proc.LoadAndDelete(k) + assert.Equal(t, v, r) + + r = proc.Load(k) + assert.Nil(t, r) +} + func TestProcess_Exit(t *testing.T) { proc := New() diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index e283fa99..3e331071 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -203,5 +203,5 @@ func (r *Runtime) Close() error { r.secretStream = nil } - return r.symbolTable.Clear() + return r.symbolTable.Close() } diff --git a/pkg/symbol/loader_test.go b/pkg/symbol/loader_test.go index adb17237..7ce9b46b 100644 --- a/pkg/symbol/loader_test.go +++ b/pkg/symbol/loader_test.go @@ -31,7 +31,7 @@ func TestLoader_Load(t *testing.T) { secretStore := secret.NewStore() table := NewTable() - defer table.Clear() + defer table.Close() loader := NewLoader(LoaderConfig{ Table: table, @@ -93,7 +93,7 @@ func TestLoader_Load(t *testing.T) { secretStore := secret.NewStore() table := NewTable() - defer table.Clear() + defer table.Close() loader := NewLoader(LoaderConfig{ Table: table, @@ -132,7 +132,7 @@ func TestLoader_Load(t *testing.T) { secretStore := secret.NewStore() table := NewTable() - defer table.Clear() + defer table.Close() loader := NewLoader(LoaderConfig{ Table: table, @@ -176,7 +176,7 @@ func TestLoader_Load(t *testing.T) { secretStore := secret.NewStore() table := NewTable() - defer table.Clear() + defer table.Close() loader := NewLoader(LoaderConfig{ Table: table, @@ -224,7 +224,7 @@ func TestLoader_Load(t *testing.T) { secretStore := secret.NewStore() table := NewTable() - defer table.Clear() + defer table.Close() loader := NewLoader(LoaderConfig{ Table: table, diff --git a/pkg/symbol/table.go b/pkg/symbol/table.go index a01dfb8b..6698eef8 100644 --- a/pkg/symbol/table.go +++ b/pkg/symbol/table.go @@ -94,8 +94,8 @@ func (t *Table) Keys() []uuid.UUID { return ids } -// Clear frees all symbols associated with the table. -func (t *Table) Clear() error { +// Close frees all symbols associated with the table. +func (t *Table) Close() error { t.mu.Lock() defer t.mu.Unlock() diff --git a/pkg/symbol/table_test.go b/pkg/symbol/table_test.go index fa9751a0..bd54f291 100644 --- a/pkg/symbol/table_test.go +++ b/pkg/symbol/table_test.go @@ -17,7 +17,7 @@ func TestTable_Insert(t *testing.T) { t.Run("Link By ID", func(t *testing.T) { t.Run("Unlinked", func(t *testing.T) { tb := NewTable() - defer tb.Clear() + defer tb.Close() meta1 := &spec.Meta{ ID: uuid.Must(uuid.NewV7()), @@ -83,7 +83,7 @@ func TestTable_Insert(t *testing.T) { t.Run("Linked", func(t *testing.T) { tb := NewTable() - defer tb.Clear() + defer tb.Close() id := uuid.Must(uuid.NewV7()) @@ -154,7 +154,7 @@ func TestTable_Insert(t *testing.T) { t.Run("Link By Name", func(t *testing.T) { t.Run("Unlinked", func(t *testing.T) { tb := NewTable() - defer tb.Clear() + defer tb.Close() meta1 := &spec.Meta{ ID: uuid.Must(uuid.NewV7()), @@ -223,7 +223,7 @@ func TestTable_Insert(t *testing.T) { t.Run("Linked", func(t *testing.T) { tb := NewTable() - defer tb.Clear() + defer tb.Close() id := uuid.Must(uuid.NewV7()) @@ -298,7 +298,7 @@ func TestTable_Free(t *testing.T) { kind := faker.UUIDHyphenated() tb := NewTable() - defer tb.Clear() + defer tb.Close() meta1 := &spec.Meta{ ID: uuid.Must(uuid.NewV7()), @@ -387,7 +387,7 @@ func TestTable_LookupByID(t *testing.T) { kind := faker.UUIDHyphenated() tb := NewTable() - defer tb.Clear() + defer tb.Close() meta := &spec.Meta{ ID: uuid.Must(uuid.NewV7()), @@ -408,7 +408,7 @@ func TestTable_Keys(t *testing.T) { kind := faker.UUIDHyphenated() tb := NewTable() - defer tb.Clear() + defer tb.Close() meta := &spec.Meta{ ID: uuid.Must(uuid.NewV7()), @@ -445,7 +445,7 @@ func TestTable_Hook(t *testing.T) { }), }, }) - defer tb.Clear() + defer tb.Close() meta1 := &spec.Meta{ ID: uuid.Must(uuid.NewV7()), @@ -525,7 +525,7 @@ func BenchmarkTable_Insert(b *testing.B) { kind := faker.UUIDHyphenated() tb := NewTable() - defer tb.Clear() + defer tb.Close() for i := 0; i < b.N; i++ { meta := &spec.Meta{ @@ -543,7 +543,7 @@ func BenchmarkTable_Free(b *testing.B) { kind := faker.UUIDHyphenated() tb := NewTable() - defer tb.Clear() + defer tb.Close() for i := 0; i < b.N; i++ { b.StopTimer()