diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 802dc427c93..d6f507eac20 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -117,6 +117,8 @@ func Test_SecretsWorker(t *testing.T) { contractReader, fetcherFn, wfRegistryAddr.Hex(), + nil, + nil, syncer.WithTicker(giveTicker.C), ) diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/syncer/engine_registry.go new file mode 100644 index 00000000000..6dd54794e8b --- /dev/null +++ b/core/services/workflows/syncer/engine_registry.go @@ -0,0 +1,64 @@ +package syncer + +import ( + "errors" + "sync" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" +) + +type engineRegistry struct { + engines map[string]*workflows.Engine + mu sync.RWMutex +} + +func newEngineRegistry() *engineRegistry { + return &engineRegistry{ + engines: make(map[string]*workflows.Engine), + } +} + +// Add adds an engine to the registry. +func (r *engineRegistry) Add(id string, engine *workflows.Engine) { + r.mu.Lock() + defer r.mu.Unlock() + r.engines[id] = engine +} + +// Get retrieves an engine from the registry. +func (r *engineRegistry) Get(id string) (*workflows.Engine, error) { + r.mu.RLock() + defer r.mu.RUnlock() + engine, found := r.engines[id] + if !found { + return nil, errors.New("engine not found") + } + return engine, nil +} + +// Pop removes an engine from the registry and returns the engine if found. +func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) { + r.mu.Lock() + defer r.mu.Unlock() + engine, ok := r.engines[id] + if !ok { + return nil, errors.New("remove failed: engine not found") + } + delete(r.engines, id) + return engine, nil +} + +// Close closes all engines in the registry. +func (r *engineRegistry) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + var err error + for id, engine := range r.engines { + closeErr := engine.Close() + if closeErr != nil { + err = errors.Join(err, closeErr) + } + delete(r.engines, id) + } + return err +} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 0ba789b3bd3..01e13d106f9 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -3,17 +3,99 @@ package syncer import ( "context" "encoding/hex" + "errors" "fmt" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) +var ErrNotImplemented = errors.New("not implemented") + +// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry +type WorkflowRegistryEventType string + +var ( + // ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made + ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1" + + // WorkflowRegisteredEvent is emitted when a workflow is registered + WorkflowRegisteredEvent WorkflowRegistryEventType = "WorkflowRegisteredV1" + + // WorkflowUpdatedEvent is emitted when a workflow is updated + WorkflowUpdatedEvent WorkflowRegistryEventType = "WorkflowUpdatedV1" + + // WorkflowPausedEvent is emitted when a workflow is paused + WorkflowPausedEvent WorkflowRegistryEventType = "WorkflowPausedV1" + + // WorkflowActivatedEvent is emitted when a workflow is activated + WorkflowActivatedEvent WorkflowRegistryEventType = "WorkflowActivatedV1" + + // WorkflowDeletedEvent is emitted when a workflow is deleted + WorkflowDeletedEvent WorkflowRegistryEventType = "WorkflowDeletedV1" +) + +// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry +// ForceUpdateSecretsRequested event. +type WorkflowRegistryForceUpdateSecretsRequestedV1 struct { + SecretsURLHash []byte + Owner []byte + WorkflowName string +} + +type WorkflowRegistryWorkflowRegisteredV1 struct { + WorkflowID [32]byte + WorkflowOwner []byte + DonID uint32 + Status uint8 + WorkflowName string + BinaryURL string + ConfigURL string + SecretsURL string +} + +type WorkflowRegistryWorkflowUpdatedV1 struct { + OldWorkflowID [32]byte + WorkflowOwner []byte + DonID uint32 + NewWorkflowID [32]byte + WorkflowName string + BinaryURL string + ConfigURL string + SecretsURL string +} + +type WorkflowRegistryWorkflowPausedV1 struct { + WorkflowID [32]byte + WorkflowOwner []byte + DonID uint32 + WorkflowName string +} + +type WorkflowRegistryWorkflowActivatedV1 struct { + WorkflowID [32]byte + WorkflowOwner []byte + DonID uint32 + WorkflowName string +} + +type WorkflowRegistryWorkflowDeletedV1 struct { + WorkflowID [32]byte + WorkflowOwner []byte + DonID uint32 + WorkflowName string +} + // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. type eventHandler struct { - lggr logger.Logger - orm ORM - fetcher FetcherFunc + lggr logger.Logger + orm WorkflowSecretsDS + fetcher FetcherFunc + workflowStore store.Store + capRegistry core.CapabilitiesRegistry + engineRegistry *engineRegistry } // newEventHandler returns a new eventHandler instance. @@ -21,11 +103,17 @@ func newEventHandler( lggr logger.Logger, orm ORM, gateway FetcherFunc, + workflowStore store.Store, + capRegistry core.CapabilitiesRegistry, + engineRegistry *engineRegistry, ) *eventHandler { return &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: gateway, + lggr: lggr, + orm: orm, + fetcher: gateway, + workflowStore: workflowStore, + capRegistry: capRegistry, + engineRegistry: engineRegistry, } } @@ -33,11 +121,52 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) switch event.EventType { case ForceUpdateSecretsEvent: return h.forceUpdateSecretsEvent(ctx, event) + case WorkflowRegisteredEvent: + return h.workflowRegisteredEvent(ctx, event) + case WorkflowUpdatedEvent: + return h.workflowUpdatedEvent(ctx, event) + case WorkflowPausedEvent: + return h.workflowPausedEvent(ctx, event) + case WorkflowActivatedEvent: + return h.workflowActivatedEvent(ctx, event) default: return fmt.Errorf("event type unsupported: %v", event.EventType) } } +// workflowRegisteredEvent handles the WorkflowRegisteredEvent event type. +// TODO: Implement this method +func (h *eventHandler) workflowRegisteredEvent( + _ context.Context, + _ WorkflowRegistryEvent, +) error { + return ErrNotImplemented +} + +// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type. +func (h *eventHandler) workflowUpdatedEvent( + _ context.Context, + _ WorkflowRegistryEvent, +) error { + return ErrNotImplemented +} + +// workflowPausedEvent handles the WorkflowPausedEvent event type. +func (h *eventHandler) workflowPausedEvent( + _ context.Context, + _ WorkflowRegistryEvent, +) error { + return ErrNotImplemented +} + +// workflowActivatedEvent handles the WorkflowActivatedEvent event type. +func (h *eventHandler) workflowActivatedEvent( + _ context.Context, + _ WorkflowRegistryEvent, +) error { + return ErrNotImplemented +} + // forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type. func (h *eventHandler) forceUpdateSecretsEvent( ctx context.Context, diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index dcdea28eda4..17c980e4f56 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -38,7 +38,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -52,7 +52,7 @@ func Test_Handler(t *testing.T) { return []byte("contents"), nil } - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err := h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -61,7 +61,7 @@ func Test_Handler(t *testing.T) { t.Run("fails to get secrets url", func(t *testing.T) { mockORM := mocks.NewORM(t) ctx := testutils.Context(t) - h := newEventHandler(lggr, mockORM, nil) + h := newEventHandler(lggr, mockORM, nil, nil, nil, nil) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -101,7 +101,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -128,7 +128,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go index b3d82c2067d..19c459fa0ee 100644 --- a/core/services/workflows/syncer/mocks/orm.go +++ b/core/services/workflows/syncer/mocks/orm.go @@ -5,6 +5,7 @@ package mocks import ( context "context" + job "github.com/smartcontractkit/chainlink/v2/core/services/job" mock "github.com/stretchr/testify/mock" ) @@ -80,6 +81,63 @@ func (_c *ORM_Create_Call) RunAndReturn(run func(context.Context, string, string return _c } +// CreateWorkflowSpec provides a mock function with given fields: ctx, spec +func (_m *ORM) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { + ret := _m.Called(ctx, spec) + + if len(ret) == 0 { + panic("no return value specified for CreateWorkflowSpec") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) (int64, error)); ok { + return rf(ctx, spec) + } + if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) int64); ok { + r0 = rf(ctx, spec) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, *job.WorkflowSpec) error); ok { + r1 = rf(ctx, spec) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_CreateWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateWorkflowSpec' +type ORM_CreateWorkflowSpec_Call struct { + *mock.Call +} + +// CreateWorkflowSpec is a helper method to define mock.On call +// - ctx context.Context +// - spec *job.WorkflowSpec +func (_e *ORM_Expecter) CreateWorkflowSpec(ctx interface{}, spec interface{}) *ORM_CreateWorkflowSpec_Call { + return &ORM_CreateWorkflowSpec_Call{Call: _e.mock.On("CreateWorkflowSpec", ctx, spec)} +} + +func (_c *ORM_CreateWorkflowSpec_Call) Run(run func(ctx context.Context, spec *job.WorkflowSpec)) *ORM_CreateWorkflowSpec_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*job.WorkflowSpec)) + }) + return _c +} + +func (_c *ORM_CreateWorkflowSpec_Call) Return(_a0 int64, _a1 error) *ORM_CreateWorkflowSpec_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_CreateWorkflowSpec_Call) RunAndReturn(run func(context.Context, *job.WorkflowSpec) (int64, error)) *ORM_CreateWorkflowSpec_Call { + _c.Call.Return(run) + return _c +} + // GetContents provides a mock function with given fields: ctx, url func (_m *ORM) GetContents(ctx context.Context, url string) (string, error) { ret := _m.Called(ctx, url) diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index a10eb708ddf..d43dbe09b78 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -2,13 +2,15 @@ package syncer import ( "context" + "errors" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" ) -type ORM interface { +type WorkflowSecretsDS interface { // GetSecretsURLByID returns the secrets URL for the given ID. GetSecretsURLByID(ctx context.Context, id int64) (string, error) @@ -30,6 +32,15 @@ type ORM interface { Create(ctx context.Context, secretsURL, hash, contents string) (int64, error) } +type WorkflowSpecsDS interface { + CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) +} + +type ORM interface { + WorkflowSecretsDS + WorkflowSpecsDS +} + type WorkflowRegistryDS = ORM type orm struct { @@ -37,7 +48,7 @@ type orm struct { lggr logger.Logger } -var _ ORM = (*orm)(nil) +var _ WorkflowRegistryDS = (*orm)(nil) func NewWorkflowRegistryDS(ds sqlutil.DataSource, lggr logger.Logger) *orm { return &orm{ @@ -137,3 +148,7 @@ func (orm *orm) Create(ctx context.Context, url, hash, contents string) (int64, func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { return crypto.Keccak256(append(owner, secretsURL...)) } + +func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { + return 0, errors.New("not implemented") +} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index ff77da9ea6f..d8ad37646d6 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -12,12 +12,14 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) const name = "WorkflowRegistrySyncer" @@ -27,22 +29,6 @@ var ( ContractName = "WorkflowRegistry" ) -// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry -type WorkflowRegistryEventType string - -var ( - // ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made - ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1" -) - -// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry -// ForceUpdateSecretsRequested event. -type WorkflowRegistryForceUpdateSecretsRequestedV1 struct { - SecretsURLHash []byte - Owner []byte - WorkflowName string -} - type Head struct { Hash string Height string @@ -131,6 +117,10 @@ type workflowRegistry struct { // heap is a min heap that merges batches of events from the contract query goroutines. The // default min heap is sorted by block height. heap Heap + + workflowStore store.Store + capRegistry core.CapabilitiesRegistry + engineRegistry *engineRegistry } // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful @@ -155,14 +145,19 @@ func NewWorkflowRegistry[T ContractReader]( reader T, gateway FetcherFunc, addr string, + workflowStore store.Store, + capRegistry core.CapabilitiesRegistry, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} wr := &workflowRegistry{ - lggr: lggr.Named(name), - orm: orm, - reader: reader, - gateway: gateway, + lggr: lggr.Named(name), + orm: orm, + reader: reader, + gateway: gateway, + workflowStore: workflowStore, + capRegistry: capRegistry, + engineRegistry: newEngineRegistry(), cfg: ContractEventPollerConfig{ ContractName: ContractName, ContractAddress: addr, @@ -176,7 +171,9 @@ func NewWorkflowRegistry[T ContractReader]( eventsCh: make(chan WorkflowRegistryEventResponse), batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), } - wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway) + wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, + wr.engineRegistry, + ) for _, opt := range opts { opt(wr) } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index d979437d54d..652b20deea1 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -55,7 +55,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { return []byte(wantContents), nil } ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, WithTicker(ticker)) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, WithTicker(ticker)) ) // Cleanup the worker