diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index d6f507eac20..802dc427c93 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -117,8 +117,6 @@ func Test_SecretsWorker(t *testing.T) { contractReader, fetcherFn, wfRegistryAddr.Hex(), - nil, - nil, syncer.WithTicker(giveTicker.C), ) diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/syncer/engine_registry.go deleted file mode 100644 index 6dd54794e8b..00000000000 --- a/core/services/workflows/syncer/engine_registry.go +++ /dev/null @@ -1,64 +0,0 @@ -package syncer - -import ( - "errors" - "sync" - - "github.com/smartcontractkit/chainlink/v2/core/services/workflows" -) - -type engineRegistry struct { - engines map[string]*workflows.Engine - mu sync.RWMutex -} - -func newEngineRegistry() *engineRegistry { - return &engineRegistry{ - engines: make(map[string]*workflows.Engine), - } -} - -// Add adds an engine to the registry. -func (r *engineRegistry) Add(id string, engine *workflows.Engine) { - r.mu.Lock() - defer r.mu.Unlock() - r.engines[id] = engine -} - -// Get retrieves an engine from the registry. -func (r *engineRegistry) Get(id string) (*workflows.Engine, error) { - r.mu.RLock() - defer r.mu.RUnlock() - engine, found := r.engines[id] - if !found { - return nil, errors.New("engine not found") - } - return engine, nil -} - -// Pop removes an engine from the registry and returns the engine if found. -func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) { - r.mu.Lock() - defer r.mu.Unlock() - engine, ok := r.engines[id] - if !ok { - return nil, errors.New("remove failed: engine not found") - } - delete(r.engines, id) - return engine, nil -} - -// Close closes all engines in the registry. -func (r *engineRegistry) Close() error { - r.mu.Lock() - defer r.mu.Unlock() - var err error - for id, engine := range r.engines { - closeErr := engine.Close() - if closeErr != nil { - err = errors.Join(err, closeErr) - } - delete(r.engines, id) - } - return err -} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 01e13d106f9..0ba789b3bd3 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -3,99 +3,17 @@ package syncer import ( "context" "encoding/hex" - "errors" "fmt" - "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) -var ErrNotImplemented = errors.New("not implemented") - -// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry -type WorkflowRegistryEventType string - -var ( - // ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made - ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1" - - // WorkflowRegisteredEvent is emitted when a workflow is registered - WorkflowRegisteredEvent WorkflowRegistryEventType = "WorkflowRegisteredV1" - - // WorkflowUpdatedEvent is emitted when a workflow is updated - WorkflowUpdatedEvent WorkflowRegistryEventType = "WorkflowUpdatedV1" - - // WorkflowPausedEvent is emitted when a workflow is paused - WorkflowPausedEvent WorkflowRegistryEventType = "WorkflowPausedV1" - - // WorkflowActivatedEvent is emitted when a workflow is activated - WorkflowActivatedEvent WorkflowRegistryEventType = "WorkflowActivatedV1" - - // WorkflowDeletedEvent is emitted when a workflow is deleted - WorkflowDeletedEvent WorkflowRegistryEventType = "WorkflowDeletedV1" -) - -// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry -// ForceUpdateSecretsRequested event. -type WorkflowRegistryForceUpdateSecretsRequestedV1 struct { - SecretsURLHash []byte - Owner []byte - WorkflowName string -} - -type WorkflowRegistryWorkflowRegisteredV1 struct { - WorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - Status uint8 - WorkflowName string - BinaryURL string - ConfigURL string - SecretsURL string -} - -type WorkflowRegistryWorkflowUpdatedV1 struct { - OldWorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - NewWorkflowID [32]byte - WorkflowName string - BinaryURL string - ConfigURL string - SecretsURL string -} - -type WorkflowRegistryWorkflowPausedV1 struct { - WorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - WorkflowName string -} - -type WorkflowRegistryWorkflowActivatedV1 struct { - WorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - WorkflowName string -} - -type WorkflowRegistryWorkflowDeletedV1 struct { - WorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - WorkflowName string -} - // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. type eventHandler struct { - lggr logger.Logger - orm WorkflowSecretsDS - fetcher FetcherFunc - workflowStore store.Store - capRegistry core.CapabilitiesRegistry - engineRegistry *engineRegistry + lggr logger.Logger + orm ORM + fetcher FetcherFunc } // newEventHandler returns a new eventHandler instance. @@ -103,17 +21,11 @@ func newEventHandler( lggr logger.Logger, orm ORM, gateway FetcherFunc, - workflowStore store.Store, - capRegistry core.CapabilitiesRegistry, - engineRegistry *engineRegistry, ) *eventHandler { return &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: gateway, - workflowStore: workflowStore, - capRegistry: capRegistry, - engineRegistry: engineRegistry, + lggr: lggr, + orm: orm, + fetcher: gateway, } } @@ -121,52 +33,11 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) switch event.EventType { case ForceUpdateSecretsEvent: return h.forceUpdateSecretsEvent(ctx, event) - case WorkflowRegisteredEvent: - return h.workflowRegisteredEvent(ctx, event) - case WorkflowUpdatedEvent: - return h.workflowUpdatedEvent(ctx, event) - case WorkflowPausedEvent: - return h.workflowPausedEvent(ctx, event) - case WorkflowActivatedEvent: - return h.workflowActivatedEvent(ctx, event) default: return fmt.Errorf("event type unsupported: %v", event.EventType) } } -// workflowRegisteredEvent handles the WorkflowRegisteredEvent event type. -// TODO: Implement this method -func (h *eventHandler) workflowRegisteredEvent( - _ context.Context, - _ WorkflowRegistryEvent, -) error { - return ErrNotImplemented -} - -// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type. -func (h *eventHandler) workflowUpdatedEvent( - _ context.Context, - _ WorkflowRegistryEvent, -) error { - return ErrNotImplemented -} - -// workflowPausedEvent handles the WorkflowPausedEvent event type. -func (h *eventHandler) workflowPausedEvent( - _ context.Context, - _ WorkflowRegistryEvent, -) error { - return ErrNotImplemented -} - -// workflowActivatedEvent handles the WorkflowActivatedEvent event type. -func (h *eventHandler) workflowActivatedEvent( - _ context.Context, - _ WorkflowRegistryEvent, -) error { - return ErrNotImplemented -} - // forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type. func (h *eventHandler) forceUpdateSecretsEvent( ctx context.Context, diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index 17c980e4f56..dcdea28eda4 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -38,7 +38,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) + h := newEventHandler(lggr, mockORM, fetcher) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -52,7 +52,7 @@ func Test_Handler(t *testing.T) { return []byte("contents"), nil } - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) + h := newEventHandler(lggr, mockORM, fetcher) err := h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -61,7 +61,7 @@ func Test_Handler(t *testing.T) { t.Run("fails to get secrets url", func(t *testing.T) { mockORM := mocks.NewORM(t) ctx := testutils.Context(t) - h := newEventHandler(lggr, mockORM, nil, nil, nil, nil) + h := newEventHandler(lggr, mockORM, nil) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -101,7 +101,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) + h := newEventHandler(lggr, mockORM, fetcher) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -128,7 +128,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) + h := newEventHandler(lggr, mockORM, fetcher) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go index 19c459fa0ee..b3d82c2067d 100644 --- a/core/services/workflows/syncer/mocks/orm.go +++ b/core/services/workflows/syncer/mocks/orm.go @@ -5,7 +5,6 @@ package mocks import ( context "context" - job "github.com/smartcontractkit/chainlink/v2/core/services/job" mock "github.com/stretchr/testify/mock" ) @@ -81,63 +80,6 @@ func (_c *ORM_Create_Call) RunAndReturn(run func(context.Context, string, string return _c } -// CreateWorkflowSpec provides a mock function with given fields: ctx, spec -func (_m *ORM) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { - ret := _m.Called(ctx, spec) - - if len(ret) == 0 { - panic("no return value specified for CreateWorkflowSpec") - } - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) (int64, error)); ok { - return rf(ctx, spec) - } - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) int64); ok { - r0 = rf(ctx, spec) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context, *job.WorkflowSpec) error); ok { - r1 = rf(ctx, spec) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_CreateWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateWorkflowSpec' -type ORM_CreateWorkflowSpec_Call struct { - *mock.Call -} - -// CreateWorkflowSpec is a helper method to define mock.On call -// - ctx context.Context -// - spec *job.WorkflowSpec -func (_e *ORM_Expecter) CreateWorkflowSpec(ctx interface{}, spec interface{}) *ORM_CreateWorkflowSpec_Call { - return &ORM_CreateWorkflowSpec_Call{Call: _e.mock.On("CreateWorkflowSpec", ctx, spec)} -} - -func (_c *ORM_CreateWorkflowSpec_Call) Run(run func(ctx context.Context, spec *job.WorkflowSpec)) *ORM_CreateWorkflowSpec_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*job.WorkflowSpec)) - }) - return _c -} - -func (_c *ORM_CreateWorkflowSpec_Call) Return(_a0 int64, _a1 error) *ORM_CreateWorkflowSpec_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_CreateWorkflowSpec_Call) RunAndReturn(run func(context.Context, *job.WorkflowSpec) (int64, error)) *ORM_CreateWorkflowSpec_Call { - _c.Call.Return(run) - return _c -} - // GetContents provides a mock function with given fields: ctx, url func (_m *ORM) GetContents(ctx context.Context, url string) (string, error) { ret := _m.Called(ctx, url) diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index d43dbe09b78..a10eb708ddf 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -2,15 +2,13 @@ package syncer import ( "context" - "errors" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" ) -type WorkflowSecretsDS interface { +type ORM interface { // GetSecretsURLByID returns the secrets URL for the given ID. GetSecretsURLByID(ctx context.Context, id int64) (string, error) @@ -32,15 +30,6 @@ type WorkflowSecretsDS interface { Create(ctx context.Context, secretsURL, hash, contents string) (int64, error) } -type WorkflowSpecsDS interface { - CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) -} - -type ORM interface { - WorkflowSecretsDS - WorkflowSpecsDS -} - type WorkflowRegistryDS = ORM type orm struct { @@ -48,7 +37,7 @@ type orm struct { lggr logger.Logger } -var _ WorkflowRegistryDS = (*orm)(nil) +var _ ORM = (*orm)(nil) func NewWorkflowRegistryDS(ds sqlutil.DataSource, lggr logger.Logger) *orm { return &orm{ @@ -148,7 +137,3 @@ func (orm *orm) Create(ctx context.Context, url, hash, contents string) (int64, func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { return crypto.Keccak256(append(owner, secretsURL...)) } - -func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { - return 0, errors.New("not implemented") -} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index d8ad37646d6..ff77da9ea6f 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -12,14 +12,12 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" - "github.com/smartcontractkit/chainlink-common/pkg/types/core" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) const name = "WorkflowRegistrySyncer" @@ -29,6 +27,22 @@ var ( ContractName = "WorkflowRegistry" ) +// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry +type WorkflowRegistryEventType string + +var ( + // ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made + ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1" +) + +// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry +// ForceUpdateSecretsRequested event. +type WorkflowRegistryForceUpdateSecretsRequestedV1 struct { + SecretsURLHash []byte + Owner []byte + WorkflowName string +} + type Head struct { Hash string Height string @@ -117,10 +131,6 @@ type workflowRegistry struct { // heap is a min heap that merges batches of events from the contract query goroutines. The // default min heap is sorted by block height. heap Heap - - workflowStore store.Store - capRegistry core.CapabilitiesRegistry - engineRegistry *engineRegistry } // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful @@ -145,19 +155,14 @@ func NewWorkflowRegistry[T ContractReader]( reader T, gateway FetcherFunc, addr string, - workflowStore store.Store, - capRegistry core.CapabilitiesRegistry, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} wr := &workflowRegistry{ - lggr: lggr.Named(name), - orm: orm, - reader: reader, - gateway: gateway, - workflowStore: workflowStore, - capRegistry: capRegistry, - engineRegistry: newEngineRegistry(), + lggr: lggr.Named(name), + orm: orm, + reader: reader, + gateway: gateway, cfg: ContractEventPollerConfig{ ContractName: ContractName, ContractAddress: addr, @@ -171,9 +176,7 @@ func NewWorkflowRegistry[T ContractReader]( eventsCh: make(chan WorkflowRegistryEventResponse), batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), } - wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, - wr.engineRegistry, - ) + wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway) for _, opt := range opts { opt(wr) } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 652b20deea1..d979437d54d 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -55,7 +55,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { return []byte(wantContents), nil } ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, WithTicker(ticker)) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, WithTicker(ticker)) ) // Cleanup the worker