diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 802dc427c93..d6f507eac20 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -117,6 +117,8 @@ func Test_SecretsWorker(t *testing.T) { contractReader, fetcherFn, wfRegistryAddr.Hex(), + nil, + nil, syncer.WithTicker(giveTicker.C), ) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 624044f092a..f5e2f3aea9b 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -6,7 +6,9 @@ import ( "errors" "fmt" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) var ErrNotImplemented = errors.New("not implemented") @@ -88,21 +90,30 @@ type WorkflowRegistryWorkflowDeletedV1 struct { // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. type eventHandler struct { - lggr logger.Logger - orm ORM - fetcher FetcherFunc + lggr logger.Logger + orm WorkflowSecretsDS + fetcher FetcherFunc + workflowStore store.Store + capRegistry core.CapabilitiesRegistry + engineRegistry *engineRegistry } // newEventHandler returns a new eventHandler instance. func newEventHandler( lggr logger.Logger, - orm ORM, + orm WorkflowSecretsDS, gateway FetcherFunc, + workflowStore store.Store, + capRegistry core.CapabilitiesRegistry, + engineRegistry *engineRegistry, ) *eventHandler { return &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: gateway, + lggr: lggr, + orm: orm, + fetcher: gateway, + workflowStore: workflowStore, + capRegistry: capRegistry, + engineRegistry: engineRegistry, } } diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index dcdea28eda4..17c980e4f56 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -38,7 +38,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -52,7 +52,7 @@ func Test_Handler(t *testing.T) { return []byte("contents"), nil } - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err := h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -61,7 +61,7 @@ func Test_Handler(t *testing.T) { t.Run("fails to get secrets url", func(t *testing.T) { mockORM := mocks.NewORM(t) ctx := testutils.Context(t) - h := newEventHandler(lggr, mockORM, nil) + h := newEventHandler(lggr, mockORM, nil, nil, nil, nil) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -101,7 +101,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -128,7 +128,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := newEventHandler(lggr, mockORM, fetcher) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index a10eb708ddf..6c767127908 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -2,13 +2,15 @@ package syncer import ( "context" + "errors" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" ) -type ORM interface { +type WorkflowSecretsDS interface { // GetSecretsURLByID returns the secrets URL for the given ID. GetSecretsURLByID(ctx context.Context, id int64) (string, error) @@ -30,14 +32,18 @@ type ORM interface { Create(ctx context.Context, secretsURL, hash, contents string) (int64, error) } -type WorkflowRegistryDS = ORM +type WorkflowSpecsDS interface { + CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) +} + +type WorkflowRegistryDS = WorkflowSecretsDS type orm struct { ds sqlutil.DataSource lggr logger.Logger } -var _ ORM = (*orm)(nil) +var _ WorkflowSecretsDS = (*orm)(nil) func NewWorkflowRegistryDS(ds sqlutil.DataSource, lggr logger.Logger) *orm { return &orm{ @@ -137,3 +143,7 @@ func (orm *orm) Create(ctx context.Context, url, hash, contents string) (int64, func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { return crypto.Keccak256(append(owner, secretsURL...)) } + +func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { + return 0, errors.New("not implemented") +} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 3ee816c6424..d8ad37646d6 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -12,12 +12,14 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) const name = "WorkflowRegistrySyncer" @@ -115,6 +117,10 @@ type workflowRegistry struct { // heap is a min heap that merges batches of events from the contract query goroutines. The // default min heap is sorted by block height. heap Heap + + workflowStore store.Store + capRegistry core.CapabilitiesRegistry + engineRegistry *engineRegistry } // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful @@ -139,14 +145,19 @@ func NewWorkflowRegistry[T ContractReader]( reader T, gateway FetcherFunc, addr string, + workflowStore store.Store, + capRegistry core.CapabilitiesRegistry, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} wr := &workflowRegistry{ - lggr: lggr.Named(name), - orm: orm, - reader: reader, - gateway: gateway, + lggr: lggr.Named(name), + orm: orm, + reader: reader, + gateway: gateway, + workflowStore: workflowStore, + capRegistry: capRegistry, + engineRegistry: newEngineRegistry(), cfg: ContractEventPollerConfig{ ContractName: ContractName, ContractAddress: addr, @@ -160,7 +171,9 @@ func NewWorkflowRegistry[T ContractReader]( eventsCh: make(chan WorkflowRegistryEventResponse), batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), } - wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway) + wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, + wr.engineRegistry, + ) for _, opt := range opts { opt(wr) } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index d979437d54d..652b20deea1 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -55,7 +55,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { return []byte(wantContents), nil } ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, WithTicker(ticker)) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, WithTicker(ticker)) ) // Cleanup the worker