From 22ddff9af96aaa0d8d5b798a487d48cd9056a12c Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Thu, 28 Nov 2024 15:18:42 +0000 Subject: [PATCH 1/9] don notifier added --- core/capabilities/launcher.go | 29 +++++++++++++++++--------- core/capabilities/launcher_test.go | 14 +++++++++++++ core/services/registrysyncer/syncer.go | 4 ++++ 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index be06dcf60c1..97aea5d3c8c 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -43,11 +43,12 @@ var defaultStreamConfig = p2ptypes.StreamConfig{ type launcher struct { services.StateMachine - lggr logger.Logger - peerWrapper p2ptypes.PeerWrapper - dispatcher remotetypes.Dispatcher - registry *Registry - subServices []services.Service + lggr logger.Logger + peerWrapper p2ptypes.PeerWrapper + dispatcher remotetypes.Dispatcher + registry *Registry + subServices []services.Service + workflowDonNotifier donNotifier } func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguration, error) { @@ -86,18 +87,24 @@ func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguratio }, nil } +type donNotifier interface { + NotifyDonSet(don capabilities.DON) +} + func NewLauncher( lggr logger.Logger, peerWrapper p2ptypes.PeerWrapper, dispatcher remotetypes.Dispatcher, registry *Registry, + workflowDonNotifier donNotifier, ) *launcher { return &launcher{ - lggr: lggr.Named("CapabilitiesLauncher"), - peerWrapper: peerWrapper, - dispatcher: dispatcher, - registry: registry, - subServices: []services.Service{}, + lggr: lggr.Named("CapabilitiesLauncher"), + peerWrapper: peerWrapper, + dispatcher: dispatcher, + registry: registry, + subServices: []services.Service{}, + workflowDonNotifier: workflowDonNotifier, } } @@ -215,6 +222,8 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist return errors.New("invariant violation: node is part of more than one workflowDON") } + w.workflowDonNotifier.NotifyDonSet(myDON.DON) + for _, rcd := range remoteCapabilityDONs { err := w.addRemoteCapabilities(ctx, myDON, rcd, state) if err != nil { diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index 013463bfdbb..c130f9833d9 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -33,6 +33,12 @@ import ( var _ capabilities.TriggerCapability = (*mockTrigger)(nil) +type mockDonNotifier struct { +} + +func (m *mockDonNotifier) NotifyDonSet(don capabilities.DON) { +} + type mockTrigger struct { capabilities.CapabilityInfo } @@ -196,6 +202,7 @@ func TestLauncher(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil) @@ -305,6 +312,7 @@ func TestLauncher(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) err = launcher.Launch(ctx, state) @@ -409,6 +417,7 @@ func TestLauncher(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) err = launcher.Launch(ctx, state) @@ -600,6 +609,7 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) @@ -752,6 +762,7 @@ func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) // If the DON were public, this would fail with two errors: @@ -917,6 +928,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) @@ -1082,6 +1094,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie wrapper, dispatcher, registry, + &mockDonNotifier{}, ) dispatcher.On("SetReceiver", fullTriggerCapID, triggerCapDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) @@ -1232,6 +1245,7 @@ func TestLauncher_SucceedsEvenIfDispatcherAlreadyHasReceiver(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) err = launcher.Launch(ctx, state) diff --git a/core/services/registrysyncer/syncer.go b/core/services/registrysyncer/syncer.go index 461824b403b..2ec97299d32 100644 --- a/core/services/registrysyncer/syncer.go +++ b/core/services/registrysyncer/syncer.go @@ -68,6 +68,10 @@ var ( defaultTickInterval = 12 * time.Second ) +type donIDSetNotifier interface { + NotifyDonIDSet() +} + // New instantiates a new RegistrySyncer func New( lggr logger.Logger, From 4b793ddccc6e120723e5f0c80d779bd3161d925a Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Thu, 28 Nov 2024 17:56:47 +0000 Subject: [PATCH 2/9] wip --- core/capabilities/don_notifier.go | 43 +++++++++++++ core/capabilities/don_notifier_test.go | 45 +++++++++++++ core/services/chainlink/application.go | 16 ++++- .../workflows/syncer/workflow_syncer_test.go | 32 ++++++++-- .../workflows/syncer/workflow_registry.go | 64 +++++++++++-------- .../syncer/workflow_registry_test.go | 29 ++++++++- 6 files changed, 194 insertions(+), 35 deletions(-) create mode 100644 core/capabilities/don_notifier.go create mode 100644 core/capabilities/don_notifier_test.go diff --git a/core/capabilities/don_notifier.go b/core/capabilities/don_notifier.go new file mode 100644 index 00000000000..4edb38d3661 --- /dev/null +++ b/core/capabilities/don_notifier.go @@ -0,0 +1,43 @@ +package capabilities + +import ( + "context" + "sync" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +) + +type DonNotifier struct { + mu sync.Mutex + don capabilities.DON + notified bool + ch chan struct{} +} + +func NewDonNotifier() *DonNotifier { + return &DonNotifier{ + ch: make(chan struct{}), + } +} + +func (n *DonNotifier) NotifyDonSet(don capabilities.DON) { + n.mu.Lock() + defer n.mu.Unlock() + if !n.notified { + n.don = don + n.notified = true + close(n.ch) + } +} + +func (n *DonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) { + select { + case <-ctx.Done(): + return capabilities.DON{}, ctx.Err() + case <-n.ch: + } + <-n.ch + n.mu.Lock() + defer n.mu.Unlock() + return n.don, nil +} diff --git a/core/capabilities/don_notifier_test.go b/core/capabilities/don_notifier_test.go new file mode 100644 index 00000000000..e44fc248f6c --- /dev/null +++ b/core/capabilities/don_notifier_test.go @@ -0,0 +1,45 @@ +package capabilities_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities" +) + +func TestDonNotifier_WaitForDon(t *testing.T) { + notifier := capabilities.NewDonNotifier() + don := commoncap.DON{ + ID: 1, + } + + go func() { + time.Sleep(100 * time.Millisecond) + notifier.NotifyDonSet(don) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + result, err := notifier.WaitForDon(ctx) + require.NoError(t, err) + assert.Equal(t, don, result) +} + +func TestDonNotifier_WaitForDon_ContextTimeout(t *testing.T) { + notifier := capabilities.NewDonNotifier() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + + _, err := notifier.WaitForDon(ctx) + require.Error(t, err) + assert.Equal(t, context.DeadlineExceeded, err) +} diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 01f5d8b530a..efaf59b7d44 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -71,6 +71,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/webhook" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth" "github.com/smartcontractkit/chainlink/v2/core/sessions/localauth" @@ -256,15 +257,28 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("could not configure syncer: %w", err) } + workflowDonNotifier := capabilities.NewDonNotifier() + wfLauncher := capabilities.NewLauncher( globalLogger, externalPeerWrapper, dispatcher, opts.CapabilitiesRegistry, + workflowDonNotifier, ) registrySyncer.AddLauncher(wfLauncher) - srvcs = append(srvcs, wfLauncher, registrySyncer) + handler := syncer.NewEventHandler(globalLogger, orm, fetcherFn, nil, nil, + emitter, clockwork.NewFakeClock(), workflowkey.Key{}) + + workflowRegistryAddress := cfg.Capabilities().WorkflowRegistry().Address() + wfSyncer := syncer.NewWorkflowRegistry(globalLogger, relayer, workflowRegistryAddress, + syncer.WorkflowEventPollerConfig{ + QueryCount: 100, + }, + ) + + srvcs = append(srvcs, wfLauncher, registrySyncer, wfSyncer) } } else { globalLogger.Debug("External registry not configured, skipping registry syncer and starting with an empty registry") diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 7471c7169ea..c886889f681 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -14,6 +14,7 @@ import ( "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -48,7 +49,16 @@ func newTestEvtHandler() *testEvtHandler { type testWorkflowRegistryContractLoader struct { } -func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*types.Head, error) { +type testDonNotifier struct { + don capabilities.DON + err error +} + +func (t *testDonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) { + return t.don, t.err +} + +func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) { return &types.Head{ Height: "0", Hash: nil, @@ -112,7 +122,7 @@ func Test_InitialStateSync(t *testing.T) { } testEventHandler := newTestEvtHandler() - loader := syncer.NewWorkflowRegistryContractLoader(wfRegistryAddr.Hex(), donID, contractReader, testEventHandler) + loader := syncer.NewWorkflowRegistryContractLoader(wfRegistryAddr.Hex(), contractReader, testEventHandler) // Create the worker worker := syncer.NewWorkflowRegistry( @@ -124,12 +134,21 @@ func Test_InitialStateSync(t *testing.T) { }, testEventHandler, loader, + &testDonNotifier{ + don: capabilities.DON{ + ID: donID, + }, + err: nil, + }, syncer.WithTicker(make(chan time.Time)), ) servicetest.Run(t, worker) - assert.Len(t, testEventHandler.events, numberWorkflows) + require.Eventually(t, func() bool { + return len(testEventHandler.events) == numberWorkflows + }, 5*time.Second, time.Second) + for _, event := range testEventHandler.events { assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) } @@ -230,7 +249,12 @@ func Test_SecretsWorker(t *testing.T) { worker := syncer.NewWorkflowRegistry(lggr, contractReader, wfRegistryAddr.Hex(), syncer.WorkflowEventPollerConfig{ QueryCount: 20, - }, handler, &testWorkflowRegistryContractLoader{}, syncer.WithTicker(giveTicker.C)) + }, handler, &testWorkflowRegistryContractLoader{}, &testDonNotifier{ + don: capabilities.DON{ + ID: donID, + }, + err: nil, + }, syncer.WithTicker(giveTicker.C)) // setup contract state to allow the secrets to be updated updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 4f3bb76bd14..8a9ecc6ab04 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" @@ -111,12 +112,11 @@ type workflowRegistry struct { lggr logger.Logger workflowRegistryAddress string - reader ContractReader // initReader allows the workflowRegistry to initialize a contract reader if one is not provided // and separates the contract reader initialization from the workflowRegistry start up. - initReader func(context.Context, logger.Logger, ContractReaderFactory, types.BoundContract) (types.ContractReader, error) - relayer ContractReaderFactory + initReader func(context.Context, newContractReaderFn, types.BoundContract) (ContractReader, error) + newContractReaderFn newContractReaderFn eventPollerCfg WorkflowEventPollerConfig eventTypes []WorkflowRegistryEventType @@ -132,6 +132,10 @@ type workflowRegistry struct { // heap is a min heap that merges batches of events from the contract query goroutines. The // default min heap is sorted by block height. heap Heap + + workflowDonNotifier donNotifier + + reader ContractReader } // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful @@ -142,12 +146,6 @@ func WithTicker(ticker <-chan time.Time) func(*workflowRegistry) { } } -func WithReader(reader types.ContractReader) func(*workflowRegistry) { - return func(wr *workflowRegistry) { - wr.reader = reader - } -} - type evtHandler interface { Handle(ctx context.Context, event Event) error } @@ -155,25 +153,32 @@ type evtHandler interface { type initialWorkflowsStateLoader interface { // LoadWorkflows loads all the workflows for the given donID from the contract. Returns the head of the chain as of the // point in time at which the load occurred. - LoadWorkflows(ctx context.Context) (*types.Head, error) + LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) +} + +type donNotifier interface { + WaitForDon(ctx context.Context) (capabilities.DON, error) } +type newContractReaderFn func(context.Context, []byte) (ContractReader, error) + // NewWorkflowRegistry returns a new workflowRegistry. // Only queries for WorkflowRegistryForceUpdateSecretsRequestedV1 events. func NewWorkflowRegistry( lggr logger.Logger, - reader ContractReader, + newContractReaderFn newContractReaderFn, addr string, eventPollerConfig WorkflowEventPollerConfig, handler evtHandler, initialWorkflowsStateLoader initialWorkflowsStateLoader, + workflowDonNotifier donNotifier, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} wr := &workflowRegistry{ lggr: lggr.Named(name), + newContractReaderFn: newContractReaderFn, workflowRegistryAddress: addr, - reader: reader, eventPollerCfg: eventPollerConfig, initReader: newReader, heap: newBlockHeightHeap(), @@ -183,6 +188,7 @@ func NewWorkflowRegistry( batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), handler: handler, initialWorkflowsStateLoader: initialWorkflowsStateLoader, + workflowDonNotifier: workflowDonNotifier, } for _, opt := range opts { @@ -193,13 +199,8 @@ func NewWorkflowRegistry( // Start starts the workflowRegistry. It starts two goroutines, one for querying the contract // and one for handling the events. -func (w *workflowRegistry) Start(ctx context.Context) error { +func (w *workflowRegistry) Start(_ context.Context) error { return w.StartOnce(w.Name(), func() error { - loadWorkflowsHead, err := w.initialWorkflowsStateLoader.LoadWorkflows(ctx) - if err != nil { - return fmt.Errorf("failed to load workflows: %w", err) - } - ctx, cancel := w.stopCh.NewCtx() w.wg.Add(1) @@ -207,6 +208,18 @@ func (w *workflowRegistry) Start(ctx context.Context) error { defer w.wg.Done() defer cancel() + don, err := w.workflowDonNotifier.WaitForDon(ctx) + if err != nil { + w.lggr.Errorf("failed to wait for don: %v", err) + return + } + + loadWorkflowsHead, err := w.initialWorkflowsStateLoader.LoadWorkflows(ctx, don) + if err != nil { + w.lggr.Errorf("failed to load workflows: %v", err) + return + } + w.syncEventsLoop(ctx, loadWorkflowsHead.Height) }() @@ -394,7 +407,7 @@ func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReade } if w.reader == nil { - reader, err := w.initReader(ctx, w.lggr, w.relayer, c) + reader, err := w.initReader(ctx, w.newContractReaderFn, c) if err != nil { return nil, err } @@ -492,10 +505,9 @@ func queryEvent( func newReader( ctx context.Context, - lggr logger.Logger, - factory ContractReaderFactory, + newReaderFn newContractReaderFn, bc types.BoundContract, -) (types.ContractReader, error) { +) (ContractReader, error) { contractReaderCfg := evmtypes.ChainReaderConfig{ Contracts: map[string]evmtypes.ChainContractReader{ WorkflowRegistryContractName: { @@ -518,7 +530,7 @@ func newReader( return nil, err } - reader, err := factory.NewContractReader(ctx, marshalledCfg) + reader, err := newReaderFn(ctx, marshalledCfg) if err != nil { return nil, err } @@ -553,19 +565,17 @@ type workflowRegistryContractLoader struct { func NewWorkflowRegistryContractLoader( workflowRegistryAddress string, - donID uint32, reader ContractReader, handler evtHandler, ) *workflowRegistryContractLoader { return &workflowRegistryContractLoader{ workflowRegistryAddress: workflowRegistryAddress, - donID: donID, reader: reader, handler: handler, } } -func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*types.Head, error) { +func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) { contractBinding := types.BoundContract{ Address: l.workflowRegistryAddress, Name: WorkflowRegistryContractName, @@ -573,7 +583,7 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*ty readIdentifier := contractBinding.ReadIdentifier(GetWorkflowMetadataListByDONMethodName) params := GetWorkflowMetadataListByDONParams{ - DonID: l.donID, + DonID: don.ID, Start: 0, Limit: 0, // 0 tells the contract to return max pagination limit workflows on each call } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 17a71d73030..b448efcbb0b 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -10,6 +10,7 @@ import ( "github.com/jonboulle/clockwork" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" types "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -26,6 +27,18 @@ import ( "github.com/stretchr/testify/require" ) +type testWorkflowRegistryContractLoader struct { +} + +type testDonNotifier struct { + don capabilities.DON + err error +} + +func (t *testDonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) { + return t.don, t.err +} + func Test_Workflow_Registry_Syncer(t *testing.T) { var ( giveContents = "contents" @@ -62,12 +75,21 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { handler = NewEventHandler(lggr, orm, gateway, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}) - loader = NewWorkflowRegistryContractLoader(contractAddress, 1, reader, handler) + loader = NewWorkflowRegistryContractLoader(contractAddress, reader, handler) - worker = NewWorkflowRegistry(lggr, reader, contractAddress, + worker = NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (ContractReader, error) { + return reader, nil + }, contractAddress, WorkflowEventPollerConfig{ QueryCount: 20, - }, handler, loader, WithTicker(ticker)) + }, handler, loader, + &testDonNotifier{ + don: capabilities.DON{ + ID: 1, + }, + err: nil, + }, + WithTicker(ticker)) ) // Cleanup the worker @@ -100,6 +122,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { reader.EXPECT().GetLatestValueWithHeadData(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&types.Head{ Height: "0", }, nil) + reader.EXPECT().Bind(mock.Anything, mock.Anything).Return(nil) // Go run the worker servicetest.Run(t, worker) From 48955c66f277a0bd488ca897dc098e260d398f85 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Thu, 28 Nov 2024 18:03:04 +0000 Subject: [PATCH 3/9] wip --- core/services/chainlink/application.go | 10 +++++----- .../workflows/syncer/workflow_syncer_test.go | 8 ++++++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index efaf59b7d44..640ebeebf46 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -268,15 +268,15 @@ func NewApplication(opts ApplicationOpts) (Application, error) { ) registrySyncer.AddLauncher(wfLauncher) - handler := syncer.NewEventHandler(globalLogger, orm, fetcherFn, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}) + // TODO create the handler and initialWorkflowsStateLoader and pass in below workflowRegistryAddress := cfg.Capabilities().WorkflowRegistry().Address() - wfSyncer := syncer.NewWorkflowRegistry(globalLogger, relayer, workflowRegistryAddress, + wfSyncer := syncer.NewWorkflowRegistry(globalLogger, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return relayer.NewContractReader(ctx, bytes) + }, workflowRegistryAddress, syncer.WorkflowEventPollerConfig{ QueryCount: 100, - }, - ) + }, nil, nil, workflowDonNotifier) srvcs = append(srvcs, wfLauncher, registrySyncer, wfSyncer) } diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index c886889f681..24eafeb81cc 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -127,7 +127,9 @@ func Test_InitialStateSync(t *testing.T) { // Create the worker worker := syncer.NewWorkflowRegistry( lggr, - contractReader, + func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return contractReader, nil + }, wfRegistryAddr.Hex(), syncer.WorkflowEventPollerConfig{ QueryCount: 20, @@ -246,7 +248,9 @@ func Test_SecretsWorker(t *testing.T) { handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}) - worker := syncer.NewWorkflowRegistry(lggr, contractReader, wfRegistryAddr.Hex(), + worker := syncer.NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return contractReader, nil + }, wfRegistryAddr.Hex(), syncer.WorkflowEventPollerConfig{ QueryCount: 20, }, handler, &testWorkflowRegistryContractLoader{}, &testDonNotifier{ From d40f112839d690b261128b33ca93944ed56b2550 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Fri, 29 Nov 2024 12:31:20 +0000 Subject: [PATCH 4/9] wire up of wf syncer --- core/services/chainlink/application.go | 92 ++++++++++++++----- .../workflows/syncer/workflow_syncer_test.go | 30 +----- core/services/workflows/syncer/fetcher.go | 1 - .../services/workflows/syncer/fetcher_test.go | 2 +- .../workflows/syncer/workflow_registry.go | 46 +++++++--- .../syncer/workflow_registry_test.go | 4 +- 6 files changed, 113 insertions(+), 62 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 640ebeebf46..b076e583508 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -20,6 +20,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap/zapcore" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/loop" commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -33,6 +34,7 @@ import ( gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -48,6 +50,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/feeds" "github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2" "github.com/smartcontractkit/chainlink/v2/core/services/gateway" + capabilities2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" + common2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" "github.com/smartcontractkit/chainlink/v2/core/services/headreporter" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keeper" @@ -213,6 +217,17 @@ func NewApplication(opts ApplicationOpts) (Application, error) { opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger) } + var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper + if cfg.Capabilities().GatewayConnector().DonID() != "" { + globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID()) + gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper( + cfg.Capabilities().GatewayConnector(), + keyStore.Eth(), + clockwork.NewRealClock(), + globalLogger) + srvcs = append(srvcs, gatewayConnectorWrapper) + } + var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { var dispatcher remotetypes.Dispatcher @@ -268,34 +283,67 @@ func NewApplication(opts ApplicationOpts) (Application, error) { ) registrySyncer.AddLauncher(wfLauncher) - // TODO create the handler and initialWorkflowsStateLoader and pass in below - - workflowRegistryAddress := cfg.Capabilities().WorkflowRegistry().Address() - wfSyncer := syncer.NewWorkflowRegistry(globalLogger, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { - return relayer.NewContractReader(ctx, bytes) - }, workflowRegistryAddress, - syncer.WorkflowEventPollerConfig{ - QueryCount: 100, - }, nil, nil, workflowDonNotifier) - - srvcs = append(srvcs, wfLauncher, registrySyncer, wfSyncer) + if cfg.Capabilities().WorkflowRegistry().Address() != "" { + if gatewayConnectorWrapper == nil { + return nil, fmt.Errorf("unable to create workflow registry syncer without gateway connector") + } + + // TODO create the handler and initialWorkflowsStateLoader and pass in below + + err = keyStore.Workflow().EnsureKey(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to ensure workflow key: %w", err) + } + + keys, err := keyStore.Workflow().GetAll() + if err != nil { + return nil, fmt.Errorf("failed to get all workflow keys: %w", err) + } + if len(keys) != 1 { + return nil, fmt.Errorf("expected 1 key, got %d", len(keys)) + } + + connector := gatewayConnectorWrapper.GetGatewayConnector() + webApiLggr := globalLogger.Named("WebAPITarget") + + webApiConfig := webapi.ServiceConfig{ + RateLimiter: common2.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + }, + } + + outgoingConnectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, + webApiConfig, + capabilities2.MethodWebAPITarget, webApiLggr) + if err != nil { + return nil, fmt.Errorf("could not create outgoing connector handler: %w", err) + } + + eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger), syncer.NewFetcherFunc(globalLogger, outgoingConnectorHandler), workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewFakeClock()), opts.CapabilitiesRegistry, + custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0]) + + loader := syncer.NewWorkflowRegistryContractLoader(cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return relayer.NewContractReader(ctx, bytes) + }, eventHandler) + + wfSyncer := syncer.NewWorkflowRegistry(globalLogger, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return relayer.NewContractReader(ctx, bytes) + }, cfg.Capabilities().WorkflowRegistry().Address(), + syncer.WorkflowEventPollerConfig{ + QueryCount: 100, + }, eventHandler, loader, workflowDonNotifier) + + srvcs = append(srvcs, wfLauncher, registrySyncer, wfSyncer) + } } } else { globalLogger.Debug("External registry not configured, skipping registry syncer and starting with an empty registry") opts.CapabilitiesRegistry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) } - var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper - if cfg.Capabilities().GatewayConnector().DonID() != "" { - globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID()) - gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper( - cfg.Capabilities().GatewayConnector(), - keyStore.Eth(), - clockwork.NewRealClock(), - globalLogger) - srvcs = append(srvcs, gatewayConnectorWrapper) - } - // LOOPs can be created as options, in the case of LOOP relayers, or // as OCR2 job implementations, in the case of Median today. // We will have a non-nil registry here in LOOP relayers are being used, otherwise diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 24eafeb81cc..cf2fb59a93b 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -67,7 +67,6 @@ func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context, } func Test_InitialStateSync(t *testing.T) { - ctx := coretestutils.Context(t) lggr := logger.TestLogger(t) backendTH := testutils.NewEVMBackendTH(t) donID := uint32(1) @@ -77,29 +76,6 @@ func Test_InitialStateSync(t *testing.T) { backendTH.Backend.Commit() require.NoError(t, err) - // Build the ContractReader config - contractReaderCfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - syncer.WorkflowRegistryContractName: { - ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - syncer.GetWorkflowMetadataListByDONMethodName: { - ChainSpecificName: syncer.GetWorkflowMetadataListByDONMethodName, - }, - }, - }, - }, - } - - contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) - require.NoError(t, err) - - contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes) - require.NoError(t, err) - - err = contractReader.Bind(ctx, []types.BoundContract{{Name: syncer.WorkflowRegistryContractName, Address: wfRegistryAddr.Hex()}}) - require.NoError(t, err) - // setup contract state to allow the secrets to be updated updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) @@ -122,13 +98,15 @@ func Test_InitialStateSync(t *testing.T) { } testEventHandler := newTestEvtHandler() - loader := syncer.NewWorkflowRegistryContractLoader(wfRegistryAddr.Hex(), contractReader, testEventHandler) + loader := syncer.NewWorkflowRegistryContractLoader(wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return backendTH.NewContractReader(ctx, t, bytes) + }, testEventHandler) // Create the worker worker := syncer.NewWorkflowRegistry( lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { - return contractReader, nil + return backendTH.NewContractReader(ctx, t, bytes) }, wfRegistryAddr.Hex(), syncer.WorkflowEventPollerConfig{ diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go index ed815a240ba..bebdfb0519e 100644 --- a/core/services/workflows/syncer/fetcher.go +++ b/core/services/workflows/syncer/fetcher.go @@ -13,7 +13,6 @@ import ( ) func NewFetcherFunc( - ctx context.Context, lggr logger.Logger, och *webapi.OutgoingConnectorHandler) FetcherFunc { return func(ctx context.Context, url string) ([]byte, error) { diff --git a/core/services/workflows/syncer/fetcher_test.go b/core/services/workflows/syncer/fetcher_test.go index 846a9186b5a..4ed228c6a51 100644 --- a/core/services/workflows/syncer/fetcher_test.go +++ b/core/services/workflows/syncer/fetcher_test.go @@ -46,7 +46,7 @@ func TestNewFetcherFunc(t *testing.T) { connector.EXPECT().DonID().Return("don-id") connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) - fetcher := NewFetcherFunc(ctx, lggr, och) + fetcher := NewFetcherFunc(lggr, och) payload, err := fetcher(ctx, url) require.NoError(t, err) diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 8a9ecc6ab04..6642679b228 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -113,9 +113,6 @@ type workflowRegistry struct { lggr logger.Logger workflowRegistryAddress string - // initReader allows the workflowRegistry to initialize a contract reader if one is not provided - // and separates the contract reader initialization from the workflowRegistry start up. - initReader func(context.Context, newContractReaderFn, types.BoundContract) (ContractReader, error) newContractReaderFn newContractReaderFn eventPollerCfg WorkflowEventPollerConfig @@ -180,7 +177,6 @@ func NewWorkflowRegistry( newContractReaderFn: newContractReaderFn, workflowRegistryAddress: addr, eventPollerCfg: eventPollerConfig, - initReader: newReader, heap: newBlockHeightHeap(), stopCh: make(services.StopChan), eventTypes: ets, @@ -407,7 +403,7 @@ func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReade } if w.reader == nil { - reader, err := w.initReader(ctx, w.newContractReaderFn, c) + reader, err := getWorkflowRegistryEventReader(ctx, w.newContractReaderFn, c) if err != nil { return nil, err } @@ -503,7 +499,7 @@ func queryEvent( } } -func newReader( +func getWorkflowRegistryEventReader( ctx context.Context, newReaderFn newContractReaderFn, bc types.BoundContract, @@ -558,24 +554,52 @@ func (r workflowAsEvent) GetData() any { type workflowRegistryContractLoader struct { workflowRegistryAddress string - donID uint32 - reader ContractReader + newContractReaderFn newContractReaderFn handler evtHandler } func NewWorkflowRegistryContractLoader( workflowRegistryAddress string, - reader ContractReader, + newContractReaderFn newContractReaderFn, handler evtHandler, ) *workflowRegistryContractLoader { return &workflowRegistryContractLoader{ workflowRegistryAddress: workflowRegistryAddress, - reader: reader, + newContractReaderFn: newContractReaderFn, handler: handler, } } func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) { + // Build the ContractReader config + contractReaderCfg := evmtypes.ChainReaderConfig{ + Contracts: map[string]evmtypes.ChainContractReader{ + WorkflowRegistryContractName: { + ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, + Configs: map[string]*evmtypes.ChainReaderDefinition{ + GetWorkflowMetadataListByDONMethodName: { + ChainSpecificName: GetWorkflowMetadataListByDONMethodName, + }, + }, + }, + }, + } + + contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) + if err != nil { + return nil, fmt.Errorf("failed to marshal contract reader config: %w", err) + } + + contractReader, err := l.newContractReaderFn(ctx, contractReaderCfgBytes) + if err != nil { + return nil, fmt.Errorf("failed to create contract reader: %w", err) + } + + err = contractReader.Bind(ctx, []types.BoundContract{{Name: WorkflowRegistryContractName, Address: l.workflowRegistryAddress}}) + if err != nil { + return nil, fmt.Errorf("failed to bind contract reader: %w", err) + } + contractBinding := types.BoundContract{ Address: l.workflowRegistryAddress, Name: WorkflowRegistryContractName, @@ -592,7 +616,7 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don for { var err error var workflows GetWorkflowMetadataListByDONReturnVal - headAtLastRead, err = l.reader.GetLatestValueWithHeadData(ctx, readIdentifier, primitives.Finalized, params, &workflows) + headAtLastRead, err = contractReader.GetLatestValueWithHeadData(ctx, readIdentifier, primitives.Finalized, params, &workflows) if err != nil { return nil, fmt.Errorf("failed to get workflow metadata for don %w", err) } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index b448efcbb0b..456ed6ea9dc 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -75,7 +75,9 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { handler = NewEventHandler(lggr, orm, gateway, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}) - loader = NewWorkflowRegistryContractLoader(contractAddress, reader, handler) + loader = NewWorkflowRegistryContractLoader(contractAddress, func(ctx context.Context, bytes []byte) (ContractReader, error) { + return reader, nil + }, handler) worker = NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (ContractReader, error) { return reader, nil From 02377813b929208ebc9245e4bd27730120072992 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Fri, 29 Nov 2024 12:40:09 +0000 Subject: [PATCH 5/9] test udpate --- core/capabilities/don_notifier_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/capabilities/don_notifier_test.go b/core/capabilities/don_notifier_test.go index e44fc248f6c..f37931259ba 100644 --- a/core/capabilities/don_notifier_test.go +++ b/core/capabilities/don_notifier_test.go @@ -31,6 +31,10 @@ func TestDonNotifier_WaitForDon(t *testing.T) { result, err := notifier.WaitForDon(ctx) require.NoError(t, err) assert.Equal(t, don, result) + + result, err = notifier.WaitForDon(ctx) + require.NoError(t, err) + assert.Equal(t, don, result) } func TestDonNotifier_WaitForDon_ContextTimeout(t *testing.T) { From 13578ce656423ce81189878c5859782d0992d26f Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Fri, 29 Nov 2024 12:43:32 +0000 Subject: [PATCH 6/9] srvcs fix --- core/services/chainlink/application.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index b076e583508..01129254fa6 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -283,13 +283,13 @@ func NewApplication(opts ApplicationOpts) (Application, error) { ) registrySyncer.AddLauncher(wfLauncher) + srvcs = append(srvcs, wfLauncher, registrySyncer) + if cfg.Capabilities().WorkflowRegistry().Address() != "" { if gatewayConnectorWrapper == nil { - return nil, fmt.Errorf("unable to create workflow registry syncer without gateway connector") + return nil, errors.New("unable to create workflow registry syncer without gateway connector") } - // TODO create the handler and initialWorkflowsStateLoader and pass in below - err = keyStore.Workflow().EnsureKey(context.Background()) if err != nil { return nil, fmt.Errorf("failed to ensure workflow key: %w", err) @@ -304,9 +304,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) { } connector := gatewayConnectorWrapper.GetGatewayConnector() - webApiLggr := globalLogger.Named("WebAPITarget") + webAPILggr := globalLogger.Named("WebAPITarget") - webApiConfig := webapi.ServiceConfig{ + webAPIConfig := webapi.ServiceConfig{ RateLimiter: common2.RateLimiterConfig{ GlobalRPS: 100.0, GlobalBurst: 100, @@ -316,8 +316,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { } outgoingConnectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, - webApiConfig, - capabilities2.MethodWebAPITarget, webApiLggr) + webAPIConfig, + capabilities2.MethodWebAPITarget, webAPILggr) if err != nil { return nil, fmt.Errorf("could not create outgoing connector handler: %w", err) } @@ -336,7 +336,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { QueryCount: 100, }, eventHandler, loader, workflowDonNotifier) - srvcs = append(srvcs, wfLauncher, registrySyncer, wfSyncer) + srvcs = append(srvcs, wfSyncer) } } } else { From a53ae7b85de096cb4ab8ef5a54e7a578275b6555 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Fri, 29 Nov 2024 14:01:40 +0000 Subject: [PATCH 7/9] review comments --- core/services/chainlink/application.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 01129254fa6..68b9b99a823 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -322,7 +322,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("could not create outgoing connector handler: %w", err) } - eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger), syncer.NewFetcherFunc(globalLogger, outgoingConnectorHandler), workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewFakeClock()), opts.CapabilitiesRegistry, + eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger), + syncer.NewFetcherFunc(globalLogger, outgoingConnectorHandler), workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()), opts.CapabilitiesRegistry, custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0]) loader := syncer.NewWorkflowRegistryContractLoader(cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { From 5363bb118f425353423a84587162a2cee3ea3601 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Fri, 29 Nov 2024 14:04:55 +0000 Subject: [PATCH 8/9] lint --- core/services/registrysyncer/syncer.go | 4 ---- core/services/workflows/syncer/workflow_registry_test.go | 3 --- 2 files changed, 7 deletions(-) diff --git a/core/services/registrysyncer/syncer.go b/core/services/registrysyncer/syncer.go index 2ec97299d32..461824b403b 100644 --- a/core/services/registrysyncer/syncer.go +++ b/core/services/registrysyncer/syncer.go @@ -68,10 +68,6 @@ var ( defaultTickInterval = 12 * time.Second ) -type donIDSetNotifier interface { - NotifyDonIDSet() -} - // New instantiates a new RegistrySyncer func New( lggr logger.Logger, diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 456ed6ea9dc..0cccb405710 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -27,9 +27,6 @@ import ( "github.com/stretchr/testify/require" ) -type testWorkflowRegistryContractLoader struct { -} - type testDonNotifier struct { don capabilities.DON err error From 7b74258ecc0ef7f8a00620edbe8e9198f1f9ed21 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Fri, 29 Nov 2024 14:35:33 +0000 Subject: [PATCH 9/9] attempt to fix ci tests --- tools/bin/go_core_tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/bin/go_core_tests b/tools/bin/go_core_tests index 88ee82c9261..76c15fccd07 100755 --- a/tools/bin/go_core_tests +++ b/tools/bin/go_core_tests @@ -4,7 +4,7 @@ set +e SCRIPT_PATH=`dirname "$0"`; SCRIPT_PATH=`eval "cd \"$SCRIPT_PATH\" && pwd"` OUTPUT_FILE=${OUTPUT_FILE:-"./output.txt"} -EXTRA_FLAGS="" +EXTRA_FLAGS="-timeout 20m" echo "Test execution results: ---------------------" echo ""