From 694dfe9d46ab1f8314fa232d2483fb2781dc6a1b Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Wed, 20 Nov 2024 14:45:15 +0200 Subject: [PATCH] chore(workflows): stubs out engine registry --- .../workflows/syncer/workflow_syncer_test.go | 2 + .../workflows/syncer/engine_registry.go | 64 +++++++++++++++++++ core/services/workflows/syncer/handler.go | 25 ++++++-- .../services/workflows/syncer/handler_test.go | 10 +-- core/services/workflows/syncer/orm.go | 16 ++++- .../workflows/syncer/workflow_registry.go | 23 +++++-- .../syncer/workflow_registry_test.go | 2 +- 7 files changed, 121 insertions(+), 21 deletions(-) create mode 100644 core/services/workflows/syncer/engine_registry.go diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 802dc427c93..d6f507eac20 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -117,6 +117,8 @@ func Test_SecretsWorker(t *testing.T) { contractReader, fetcherFn, wfRegistryAddr.Hex(), + nil, + nil, syncer.WithTicker(giveTicker.C), ) diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/syncer/engine_registry.go new file mode 100644 index 00000000000..6dd54794e8b --- /dev/null +++ b/core/services/workflows/syncer/engine_registry.go @@ -0,0 +1,64 @@ +package syncer + +import ( + "errors" + "sync" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" +) + +type engineRegistry struct { + engines map[string]*workflows.Engine + mu sync.RWMutex +} + +func newEngineRegistry() *engineRegistry { + return &engineRegistry{ + engines: make(map[string]*workflows.Engine), + } +} + +// Add adds an engine to the registry. +func (r *engineRegistry) Add(id string, engine *workflows.Engine) { + r.mu.Lock() + defer r.mu.Unlock() + r.engines[id] = engine +} + +// Get retrieves an engine from the registry. +func (r *engineRegistry) Get(id string) (*workflows.Engine, error) { + r.mu.RLock() + defer r.mu.RUnlock() + engine, found := r.engines[id] + if !found { + return nil, errors.New("engine not found") + } + return engine, nil +} + +// Pop removes an engine from the registry and returns the engine if found. +func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) { + r.mu.Lock() + defer r.mu.Unlock() + engine, ok := r.engines[id] + if !ok { + return nil, errors.New("remove failed: engine not found") + } + delete(r.engines, id) + return engine, nil +} + +// Close closes all engines in the registry. +func (r *engineRegistry) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + var err error + for id, engine := range r.engines { + closeErr := engine.Close() + if closeErr != nil { + err = errors.Join(err, closeErr) + } + delete(r.engines, id) + } + return err +} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 624044f092a..f5e2f3aea9b 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -6,7 +6,9 @@ import ( "errors" "fmt" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) var ErrNotImplemented = errors.New("not implemented") @@ -88,21 +90,30 @@ type WorkflowRegistryWorkflowDeletedV1 struct { // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. type eventHandler struct { - lggr logger.Logger - orm ORM - fetcher FetcherFunc + lggr logger.Logger + orm WorkflowSecretsDS + fetcher FetcherFunc + workflowStore store.Store + capRegistry core.CapabilitiesRegistry + engineRegistry *engineRegistry } // newEventHandler returns a new eventHandler instance. func newEventHandler( lggr logger.Logger, - orm ORM, + orm WorkflowSecretsDS, gateway FetcherFunc, + workflowStore store.Store, + capRegistry core.CapabilitiesRegistry, + engineRegistry *engineRegistry, ) *eventHandler { return &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: gateway, + lggr: lggr, + orm: orm, + fetcher: gateway, + workflowStore: workflowStore, + capRegistry: capRegistry, + engineRegistry: engineRegistry, } } diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index dcdea28eda4..17c980e4f56 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -38,7 +38,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -52,7 +52,7 @@ func Test_Handler(t *testing.T) { return []byte("contents"), nil } - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err := h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -61,7 +61,7 @@ func Test_Handler(t *testing.T) { t.Run("fails to get secrets url", func(t *testing.T) { mockORM := mocks.NewORM(t) ctx := testutils.Context(t) - h := newEventHandler(lggr, mockORM, nil) + h := newEventHandler(lggr, mockORM, nil, nil, nil, nil) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -101,7 +101,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -128,7 +128,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index a10eb708ddf..6c767127908 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -2,13 +2,15 @@ package syncer import ( "context" + "errors" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" ) -type ORM interface { +type WorkflowSecretsDS interface { // GetSecretsURLByID returns the secrets URL for the given ID. GetSecretsURLByID(ctx context.Context, id int64) (string, error) @@ -30,14 +32,18 @@ type ORM interface { Create(ctx context.Context, secretsURL, hash, contents string) (int64, error) } -type WorkflowRegistryDS = ORM +type WorkflowSpecsDS interface { + CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) +} + +type WorkflowRegistryDS = WorkflowSecretsDS type orm struct { ds sqlutil.DataSource lggr logger.Logger } -var _ ORM = (*orm)(nil) +var _ WorkflowSecretsDS = (*orm)(nil) func NewWorkflowRegistryDS(ds sqlutil.DataSource, lggr logger.Logger) *orm { return &orm{ @@ -137,3 +143,7 @@ func (orm *orm) Create(ctx context.Context, url, hash, contents string) (int64, func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { return crypto.Keccak256(append(owner, secretsURL...)) } + +func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { + return 0, errors.New("not implemented") +} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 3ee816c6424..d8ad37646d6 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -12,12 +12,14 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) const name = "WorkflowRegistrySyncer" @@ -115,6 +117,10 @@ type workflowRegistry struct { // heap is a min heap that merges batches of events from the contract query goroutines. The // default min heap is sorted by block height. heap Heap + + workflowStore store.Store + capRegistry core.CapabilitiesRegistry + engineRegistry *engineRegistry } // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful @@ -139,14 +145,19 @@ func NewWorkflowRegistry[T ContractReader]( reader T, gateway FetcherFunc, addr string, + workflowStore store.Store, + capRegistry core.CapabilitiesRegistry, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} wr := &workflowRegistry{ - lggr: lggr.Named(name), - orm: orm, - reader: reader, - gateway: gateway, + lggr: lggr.Named(name), + orm: orm, + reader: reader, + gateway: gateway, + workflowStore: workflowStore, + capRegistry: capRegistry, + engineRegistry: newEngineRegistry(), cfg: ContractEventPollerConfig{ ContractName: ContractName, ContractAddress: addr, @@ -160,7 +171,9 @@ func NewWorkflowRegistry[T ContractReader]( eventsCh: make(chan WorkflowRegistryEventResponse), batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), } - wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway) + wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, + wr.engineRegistry, + ) for _, opt := range opts { opt(wr) } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index d979437d54d..652b20deea1 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -55,7 +55,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { return []byte(wantContents), nil } ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, WithTicker(ticker)) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, WithTicker(ticker)) ) // Cleanup the worker