Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cappl 391 wire in workflow registry #15460

Merged
merged 9 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions core/capabilities/don_notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package capabilities

import (
"context"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
)

type DonNotifier struct {
mu sync.Mutex
don capabilities.DON
notified bool
ch chan struct{}
}

func NewDonNotifier() *DonNotifier {
return &DonNotifier{
ch: make(chan struct{}),
}
}

func (n *DonNotifier) NotifyDonSet(don capabilities.DON) {
n.mu.Lock()
defer n.mu.Unlock()
if !n.notified {
n.don = don
n.notified = true
close(n.ch)
}
}

func (n *DonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) {
select {
case <-ctx.Done():
return capabilities.DON{}, ctx.Err()
case <-n.ch:
}
<-n.ch
n.mu.Lock()
defer n.mu.Unlock()
return n.don, nil
}
49 changes: 49 additions & 0 deletions core/capabilities/don_notifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package capabilities_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"

"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink/v2/core/capabilities"
)

func TestDonNotifier_WaitForDon(t *testing.T) {
notifier := capabilities.NewDonNotifier()
don := commoncap.DON{
ID: 1,
}

go func() {
time.Sleep(100 * time.Millisecond)
notifier.NotifyDonSet(don)
}()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

result, err := notifier.WaitForDon(ctx)
require.NoError(t, err)
assert.Equal(t, don, result)

result, err = notifier.WaitForDon(ctx)
require.NoError(t, err)
assert.Equal(t, don, result)
}

func TestDonNotifier_WaitForDon_ContextTimeout(t *testing.T) {
notifier := capabilities.NewDonNotifier()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()

_, err := notifier.WaitForDon(ctx)
require.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
}
29 changes: 19 additions & 10 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ var defaultStreamConfig = p2ptypes.StreamConfig{

type launcher struct {
services.StateMachine
lggr logger.Logger
peerWrapper p2ptypes.PeerWrapper
dispatcher remotetypes.Dispatcher
registry *Registry
subServices []services.Service
lggr logger.Logger
peerWrapper p2ptypes.PeerWrapper
dispatcher remotetypes.Dispatcher
registry *Registry
subServices []services.Service
workflowDonNotifier donNotifier
}

func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguration, error) {
Expand Down Expand Up @@ -86,18 +87,24 @@ func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguratio
}, nil
}

type donNotifier interface {
NotifyDonSet(don capabilities.DON)
}

func NewLauncher(
lggr logger.Logger,
peerWrapper p2ptypes.PeerWrapper,
dispatcher remotetypes.Dispatcher,
registry *Registry,
workflowDonNotifier donNotifier,
) *launcher {
return &launcher{
lggr: lggr.Named("CapabilitiesLauncher"),
peerWrapper: peerWrapper,
dispatcher: dispatcher,
registry: registry,
subServices: []services.Service{},
lggr: lggr.Named("CapabilitiesLauncher"),
peerWrapper: peerWrapper,
dispatcher: dispatcher,
registry: registry,
subServices: []services.Service{},
workflowDonNotifier: workflowDonNotifier,
}
}

Expand Down Expand Up @@ -215,6 +222,8 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist
return errors.New("invariant violation: node is part of more than one workflowDON")
}

w.workflowDonNotifier.NotifyDonSet(myDON.DON)

for _, rcd := range remoteCapabilityDONs {
err := w.addRemoteCapabilities(ctx, myDON, rcd, state)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (

var _ capabilities.TriggerCapability = (*mockTrigger)(nil)

type mockDonNotifier struct {
}

func (m *mockDonNotifier) NotifyDonSet(don capabilities.DON) {
}

type mockTrigger struct {
capabilities.CapabilityInfo
}
Expand Down Expand Up @@ -196,6 +202,7 @@ func TestLauncher(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
Expand Down Expand Up @@ -305,6 +312,7 @@ func TestLauncher(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

err = launcher.Launch(ctx, state)
Expand Down Expand Up @@ -409,6 +417,7 @@ func TestLauncher(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

err = launcher.Launch(ctx, state)
Expand Down Expand Up @@ -600,6 +609,7 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
Expand Down Expand Up @@ -752,6 +762,7 @@ func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

// If the DON were public, this would fail with two errors:
Expand Down Expand Up @@ -917,6 +928,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
Expand Down Expand Up @@ -1082,6 +1094,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

dispatcher.On("SetReceiver", fullTriggerCapID, triggerCapDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
Expand Down Expand Up @@ -1232,6 +1245,7 @@ func TestLauncher_SucceedsEvenIfDispatcherAlreadyHasReceiver(t *testing.T) {
wrapper,
dispatcher,
registry,
&mockDonNotifier{},
)

err = launcher.Launch(ctx, state)
Expand Down
84 changes: 73 additions & 11 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
Expand All @@ -33,6 +34,7 @@ import (
gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand All @@ -48,6 +50,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/feeds"
"github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
capabilities2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
common2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keeper"
Expand All @@ -71,6 +75,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth"
"github.com/smartcontractkit/chainlink/v2/core/sessions/localauth"
Expand Down Expand Up @@ -212,6 +217,17 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)
}

var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper
if cfg.Capabilities().GatewayConnector().DonID() != "" {
globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID())
gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper(
cfg.Capabilities().GatewayConnector(),
keyStore.Eth(),
clockwork.NewRealClock(),
globalLogger)
srvcs = append(srvcs, gatewayConnectorWrapper)
}

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
var dispatcher remotetypes.Dispatcher
Expand Down Expand Up @@ -256,32 +272,78 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("could not configure syncer: %w", err)
}

workflowDonNotifier := capabilities.NewDonNotifier()

wfLauncher := capabilities.NewLauncher(
globalLogger,
externalPeerWrapper,
dispatcher,
opts.CapabilitiesRegistry,
workflowDonNotifier,
)
registrySyncer.AddLauncher(wfLauncher)

srvcs = append(srvcs, wfLauncher, registrySyncer)

if cfg.Capabilities().WorkflowRegistry().Address() != "" {
if gatewayConnectorWrapper == nil {
return nil, errors.New("unable to create workflow registry syncer without gateway connector")
}

err = keyStore.Workflow().EnsureKey(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to ensure workflow key: %w", err)
}

keys, err := keyStore.Workflow().GetAll()
if err != nil {
return nil, fmt.Errorf("failed to get all workflow keys: %w", err)
}
if len(keys) != 1 {
return nil, fmt.Errorf("expected 1 key, got %d", len(keys))
}

connector := gatewayConnectorWrapper.GetGatewayConnector()
webAPILggr := globalLogger.Named("WebAPITarget")

webAPIConfig := webapi.ServiceConfig{
RateLimiter: common2.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}

outgoingConnectorHandler, err := webapi.NewOutgoingConnectorHandler(connector,
webAPIConfig,
capabilities2.MethodWebAPITarget, webAPILggr)
if err != nil {
return nil, fmt.Errorf("could not create outgoing connector handler: %w", err)
}

eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger), syncer.NewFetcherFunc(globalLogger, outgoingConnectorHandler), workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewFakeClock()), opts.CapabilitiesRegistry,
ettec marked this conversation as resolved.
Show resolved Hide resolved
custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0])

loader := syncer.NewWorkflowRegistryContractLoader(cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify this to just relayer.NewContractReader I think?

Copy link
Collaborator Author

@ettec ettec Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needs to be the factory method, internally the workflow registry create this on demand am assuming for good reason

}, eventHandler)

wfSyncer := syncer.NewWorkflowRegistry(globalLogger, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with this

}, cfg.Capabilities().WorkflowRegistry().Address(),
syncer.WorkflowEventPollerConfig{
QueryCount: 100,
}, eventHandler, loader, workflowDonNotifier)

srvcs = append(srvcs, wfSyncer)
}
}
} else {
globalLogger.Debug("External registry not configured, skipping registry syncer and starting with an empty registry")
opts.CapabilitiesRegistry.SetLocalRegistry(&capabilities.TestMetadataRegistry{})
}

var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper
if cfg.Capabilities().GatewayConnector().DonID() != "" {
globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID())
gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper(
cfg.Capabilities().GatewayConnector(),
keyStore.Eth(),
clockwork.NewRealClock(),
globalLogger)
srvcs = append(srvcs, gatewayConnectorWrapper)
}

// LOOPs can be created as options, in the case of LOOP relayers, or
// as OCR2 job implementations, in the case of Median today.
// We will have a non-nil registry here in LOOP relayers are being used, otherwise
Expand Down
4 changes: 4 additions & 0 deletions core/services/registrysyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
defaultTickInterval = 12 * time.Second
)

type donIDSetNotifier interface {

Check failure on line 71 in core/services/registrysyncer/syncer.go

View workflow job for this annotation

GitHub Actions / lint

type `donIDSetNotifier` is unused (unused)
NotifyDonIDSet()
}

// New instantiates a new RegistrySyncer
func New(
lggr logger.Logger,
Expand Down
Loading
Loading