Skip to content

Commit

Permalink
fix: remain native codec
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Oct 9, 2024
1 parent 2d36910 commit f5e3672
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 37 deletions.
33 changes: 25 additions & 8 deletions pkg/chart/linker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package chart

import (
"slices"
"sync"

"github.com/siyul-park/uniflow/pkg/hook"
"github.com/siyul-park/uniflow/pkg/node"
Expand All @@ -20,6 +20,8 @@ type LinkerConfig struct {
type Linker struct {
hook *hook.Hook
scheme *scheme.Scheme
codecs map[string]scheme.Codec
mu sync.RWMutex
}

var _ LoadHook = (*Linker)(nil)
Expand All @@ -30,16 +32,22 @@ func NewLinker(config LinkerConfig) *Linker {
return &Linker{
hook: config.Hook,
scheme: config.Scheme,
codecs: make(map[string]scheme.Codec),
}
}

// Load loads the chart, creating nodes and symbols.
func (l *Linker) Load(chrt *Chart) error {
if slices.Contains(l.scheme.Kinds(), chrt.GetName()) {
l.mu.Lock()
defer l.mu.Unlock()

kind := chrt.GetName()
codec := l.codecs[kind]
if l.scheme.Codec(kind) != codec {
return nil

Check warning on line 47 in pkg/chart/linker.go

View check run for this annotation

Codecov / codecov/patch

pkg/chart/linker.go#L47

Added line #L47 was not covered by tests
}

codec := scheme.CodecFunc(func(sp spec.Spec) (node.Node, error) {
codec = scheme.CodecFunc(func(sp spec.Spec) (node.Node, error) {
specs, err := chrt.Build(sp)
if err != nil {
return nil, err

Check warning on line 53 in pkg/chart/linker.go

View check run for this annotation

Codecov / codecov/patch

pkg/chart/linker.go#L53

Added line #L53 was not covered by tests
Expand Down Expand Up @@ -103,16 +111,25 @@ func (l *Linker) Load(chrt *Chart) error {
return n, nil
})

l.scheme.AddKnownType(chrt.GetName(), &spec.Unstructured{})
l.scheme.AddCodec(chrt.GetName(), codec)

l.scheme.AddKnownType(kind, &spec.Unstructured{})
l.scheme.AddCodec(kind, codec)
l.codecs[kind] = codec
return nil
}

// Unload removes the chart from the scheme.
func (l *Linker) Unload(chrt *Chart) error {
l.scheme.RemoveKnownType(chrt.GetName())
l.scheme.RemoveCodec(chrt.GetName())
l.mu.Lock()
defer l.mu.Unlock()

kind := chrt.GetName()
codec := l.codecs[kind]
if l.scheme.Codec(kind) != codec {
return nil

Check warning on line 128 in pkg/chart/linker.go

View check run for this annotation

Codecov / codecov/patch

pkg/chart/linker.go#L128

Added line #L128 was not covered by tests
}

l.scheme.RemoveKnownType(kind)
l.scheme.RemoveCodec(kind)
delete(l.codecs, kind)
return nil
}
5 changes: 1 addition & 4 deletions pkg/chart/linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ func TestLinker_Unload(t *testing.T) {
Specs: []spec.Spec{},
}

s.AddKnownType(chrt.GetName(), &spec.Meta{})
s.AddCodec(chrt.GetName(), scheme.CodecFunc(func(spec spec.Spec) (node.Node, error) {
return node.NewOneToOneNode(nil), nil
}))
l.Load(chrt)

err := l.Unload(chrt)
assert.NoError(t, err)
Expand Down
12 changes: 12 additions & 0 deletions pkg/chart/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ func (t *Table) Lookup(id uuid.UUID) *Chart {
return t.charts[id]
}

// Links returns the charts linked to the chart specified by its UUID.
func (t *Table) Links(id uuid.UUID) []*Chart {
t.mu.RLock()
defer t.mu.RUnlock()

chrt, ok := t.charts[id]
if !ok {
return nil

Check warning on line 82 in pkg/chart/table.go

View check run for this annotation

Codecov / codecov/patch

pkg/chart/table.go#L82

Added line #L82 was not covered by tests
}
return t.linked(chrt)
}

// Keys returns all IDs of charts in the table.
func (t *Table) Keys() []uuid.UUID {
t.mu.RLock()
Expand Down
33 changes: 33 additions & 0 deletions pkg/chart/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,39 @@ func TestTable_Lookup(t *testing.T) {
assert.Equal(t, chrt, tb.Lookup(chrt.GetID()))
}

func TestTable_Links(t *testing.T) {
tb := NewTable()
defer tb.Close()

chrt1 := &Chart{
ID: uuid.Must(uuid.NewV7()),
Namespace: resource.DefaultNamespace,
Name: faker.UUIDHyphenated(),
Specs: []spec.Spec{},
}
chrt2 := &Chart{
ID: uuid.Must(uuid.NewV7()),
Namespace: resource.DefaultNamespace,
Name: faker.UUIDHyphenated(),
Specs: []spec.Spec{
&spec.Meta{
Kind: chrt1.GetName(),
Namespace: resource.DefaultNamespace,
Name: faker.UUIDHyphenated(),
},
},
}

tb.Insert(chrt1)
tb.Insert(chrt2)

links := tb.Links(chrt1.GetID())
assert.Equal(t, []*Chart{chrt1, chrt2}, links)

links = tb.Links(chrt2.GetID())
assert.Equal(t, []*Chart{chrt2}, links)
}

func TestTable_Keys(t *testing.T) {
tb := NewTable()
defer tb.Close()
Expand Down
34 changes: 18 additions & 16 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type Config struct {
Namespace string // Namespace defines the isolated execution environment for workflows.
Hook *hook.Hook // Hook is a collection of hook functions for managing symbols.
Scheme *scheme.Scheme // Scheme defines the scheme and behaviors for symbols.
ChartStore chart.Store
SpecStore spec.Store // SpecStore is responsible for persisting specifications.
SecretStore secret.Store // SecretStore is responsible for persisting secrets.
ChartStore chart.Store // ChartStore is responsible for persisting charts.
SpecStore spec.Store // SpecStore is responsible for persisting specifications.
SecretStore secret.Store // SecretStore is responsible for persisting secrets.
}

// Runtime represents an environment for executing Workflows.
Expand Down Expand Up @@ -183,35 +183,37 @@ func (r *Runtime) Reconcile(ctx context.Context) error {
return nil

Check warning on line 183 in pkg/runtime/runtime.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/runtime.go#L183

Added line #L183 was not covered by tests
}

charts, err := r.chartStore.Load(ctx, &chart.Chart{ID: event.ID})
if err != nil {
return err
}
charts := r.chartTable.Links(event.ID)
if len(charts) == 0 {
if chrt := r.chartTable.Lookup(event.ID); chrt != nil {
charts = append(charts, chrt)
} else {
charts = append(charts, &chart.Chart{ID: event.ID})
var err error
charts, err = r.chartStore.Load(ctx, &chart.Chart{ID: event.ID})
if err != nil {
return err

Check warning on line 191 in pkg/runtime/runtime.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/runtime.go#L191

Added line #L191 was not covered by tests
}
}

for _, chrt := range charts {
r.chartLoader.Load(ctx, chrt)
}

kinds := make([]string, 0, len(charts))
for _, chrt := range charts {
kinds = append(kinds, chrt.GetName())
}

bounded := make(map[uuid.UUID]spec.Spec)
for _, id := range r.symbolTable.Keys() {
sb := r.symbolTable.Lookup(id)
if sb != nil && slices.Contains(kinds, sb.Kind()) {
bounded[sb.ID()] = sb.Spec
r.symbolTable.Free(sb.ID())
unloaded[sb.ID()] = sb.Spec
}
}
for _, sp := range unloaded {
if slices.Contains(kinds, sp.GetKind()) {
bounded[sp.GetID()] = sp
}
}

r.chartLoader.Load(ctx, &chart.Chart{ID: event.ID})

for _, sp := range bounded {
if slices.Contains(kinds, sp.GetKind()) {
if err := r.symbolLoader.Load(ctx, sp); err != nil {
unloaded[sp.GetID()] = sp
Expand Down
1 change: 0 additions & 1 deletion pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestRuntime_Load(t *testing.T) {
SpecStore: specStore,
SecretStore: secretStore,
})

defer r.Close()

meta := &spec.Meta{
Expand Down
23 changes: 15 additions & 8 deletions pkg/scheme/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,24 @@ import (
"github.com/siyul-park/uniflow/pkg/spec"
)

// Codec defines the interface for decoding spec.Spec into a node.Node.
// Codec defines the interface for converting a spec.Spec into a node.Node.
type Codec interface {
// Compile compiles the given spec.Spec into a node.Node.
// Compile converts the given spec.Spec into a node.Node.
Compile(sp spec.Spec) (node.Node, error)
}

// CodecFunc represents a function type that implements the Codec interface.
type CodecFunc func(sp spec.Spec) (node.Node, error)
type codec struct {
compile func(sp spec.Spec) (node.Node, error)
}

var _ Codec = (*codec)(nil)

// CodecFunc takes a compile function and returns a struct that implements the Codec interface.
func CodecFunc(compile func(sp spec.Spec) (node.Node, error)) Codec {
return &codec{compile: compile}
}

// CodecWithType creates a new CodecFunc for the specified type T.
// CodecWithType creates a Codec that works with a specific type T.
func CodecWithType[T spec.Spec](compile func(spec T) (node.Node, error)) Codec {
return CodecFunc(func(spec spec.Spec) (node.Node, error) {
if converted, ok := spec.(T); ok {
Expand All @@ -26,7 +34,6 @@ func CodecWithType[T spec.Spec](compile func(spec T) (node.Node, error)) Codec {
})
}

// Compile implements the Compile method for CodecFunc.
func (f CodecFunc) Compile(sp spec.Spec) (node.Node, error) {
return f(sp)
func (c *codec) Compile(sp spec.Spec) (node.Node, error) {
return c.compile(sp)
}

0 comments on commit f5e3672

Please sign in to comment.