Skip to content

Commit

Permalink
[KS-340] Stop hardcoding DON ID in workflow engine (#13549)
Browse files Browse the repository at this point in the history
* [KS-340] Stop hardcoding DON ID in workflow engine

* Log error when node belongs to more than one workflowDON

---------

Co-authored-by: Bolek <1416262+bolekk@users.noreply.github.com>
  • Loading branch information
cedric-cordenier and bolekk authored Jun 15, 2024
1 parent f7e0362 commit f296311
Show file tree
Hide file tree
Showing 18 changed files with 229 additions and 117 deletions.
53 changes: 50 additions & 3 deletions core/capabilities/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package capabilities
import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types"
kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

type remoteRegistryReader struct {
r types.ContractReader
r types.ContractReader
peerWrapper p2ptypes.PeerWrapper
lggr logger.Logger
}

var _ reader = (*remoteRegistryReader)(nil)
Expand All @@ -25,6 +31,43 @@ type state struct {
IDsToCapabilities map[hashedCapabilityID]kcr.CapabilityRegistryCapability
}

func (r *remoteRegistryReader) LocalNode(ctx context.Context) (capabilities.Node, error) {
if r.peerWrapper.GetPeer() == nil {
return capabilities.Node{}, errors.New("unable to get peer: peerWrapper hasn't started yet")
}

pid := r.peerWrapper.GetPeer().ID()

readerState, err := r.state(ctx)
if err != nil {
return capabilities.Node{}, fmt.Errorf("failed to get state from registry to determine don ownership: %w", err)
}

var workflowDON capabilities.DON
capabilityDONs := []capabilities.DON{}
for _, d := range readerState.IDsToDONs {
for _, p := range d.NodeP2PIds {
if p == pid {
if d.AcceptsWorkflows {
if workflowDON.ID == "" {
workflowDON = *toDONInfo(d)
} else {
r.lggr.Errorf("Configuration error: node %s belongs to more than one workflowDON", pid)
}
}

capabilityDONs = append(capabilityDONs, *toDONInfo(d))
}
}
}

return capabilities.Node{
PeerID: &pid,
WorkflowDON: workflowDON,
CapabilityDONs: capabilityDONs,
}, nil
}

func (r *remoteRegistryReader) state(ctx context.Context) (state, error) {
dons := []kcr.CapabilityRegistryDONInfo{}
err := r.r.GetLatestValue(ctx, "capabilityRegistry", "getDONs", nil, &dons)
Expand Down Expand Up @@ -66,7 +109,7 @@ type contractReaderFactory interface {
NewContractReader(context.Context, []byte) (types.ContractReader, error)
}

func newRemoteRegistryReader(ctx context.Context, relayer contractReaderFactory, remoteRegistryAddress string) (*remoteRegistryReader, error) {
func newRemoteRegistryReader(ctx context.Context, lggr logger.Logger, peerWrapper p2ptypes.PeerWrapper, relayer contractReaderFactory, remoteRegistryAddress string) (*remoteRegistryReader, error) {
contractReaderConfig := evmrelaytypes.ChainReaderConfig{
Contracts: map[string]evmrelaytypes.ChainContractReader{
"capabilityRegistry": {
Expand Down Expand Up @@ -106,5 +149,9 @@ func newRemoteRegistryReader(ctx context.Context, relayer contractReaderFactory,
return nil, err
}

return &remoteRegistryReader{r: cr}, err
return &remoteRegistryReader{
r: cr,
peerWrapper: peerWrapper,
lggr: lggr,
}, err
}
32 changes: 31 additions & 1 deletion core/capabilities/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -110,6 +111,24 @@ func randomWord() [32]byte {
return [32]byte(word)
}

type mockWrapper struct {
services.Service
peer p2ptypes.Peer
}

func (m mockWrapper) GetPeer() p2ptypes.Peer {
return m.peer
}

type mockPeer struct {
p2ptypes.Peer
peerID p2ptypes.PeerID
}

func (m mockPeer) ID() p2ptypes.PeerID {
return m.peerID
}

func TestReader_Integration(t *testing.T) {
ctx := testutils.Context(t)
reg, regAddress, owner, sim := startNewChainWithRegistry(t)
Expand Down Expand Up @@ -180,7 +199,12 @@ func TestReader_Integration(t *testing.T) {
require.NoError(t, err)

factory := newContractReaderFactory(t, sim)
reader, err := newRemoteRegistryReader(ctx, factory, regAddress.Hex())
pw := mockWrapper{
peer: mockPeer{
peerID: nodeSet[0],
},
}
reader, err := newRemoteRegistryReader(ctx, logger.TestLogger(t), pw, factory, regAddress.Hex())
require.NoError(t, err)

s, err := reader.state(ctx)
Expand All @@ -207,4 +231,10 @@ func TestReader_Integration(t *testing.T) {
nodeSet[1]: nodes[1],
nodeSet[2]: nodes[2],
}, s.IDsToNodes)

node, err := reader.LocalNode(ctx)
require.NoError(t, err)

assert.Equal(t, p2ptypes.PeerID(nodeSet[0]), *node.PeerID)
assert.Equal(t, fmt.Sprint(1), node.WorkflowDON.ID)
}
9 changes: 5 additions & 4 deletions core/capabilities/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

type reader interface {
state(ctx context.Context) (state, error)
LocalNode(ctx context.Context) (capabilities.Node, error)
}

type registrySyncer struct {
Expand All @@ -41,7 +42,7 @@ type registrySyncer struct {
stopCh services.StopChan
subServices []services.Service
networkSetup HardcodedDonNetworkSetup
reader reader
reader

wg sync.WaitGroup
lggr logger.Logger
Expand Down Expand Up @@ -79,7 +80,7 @@ func NewRegistrySyncer(
) (*registrySyncer, error) {
stopCh := make(services.StopChan)
ctx, _ := stopCh.NewCtx()
reader, err := newRemoteRegistryReader(ctx, relayer, registryAddress)
reader, err := newRemoteRegistryReader(ctx, lggr, peerWrapper, relayer, registryAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -116,8 +117,8 @@ func newRegistrySyncer(
}

func (s *registrySyncer) Start(ctx context.Context) error {
// NOTE: Decrease wg.Add and uncomment line 124 below
// this for a hardcoded syncer
// NOTE: Decrease wg.Add and uncomment the line below
// `go s.launch()` to enable the hardcoded syncer.
s.wg.Add(1)
// go s.launch()
go s.syncLoop()
Expand Down
4 changes: 4 additions & 0 deletions core/capabilities/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (m mockReader) state(ctx context.Context) (state, error) {
return m.s, m.err
}

func (m mockReader) LocalNode(ctx context.Context) (capabilities.Node, error) {
return capabilities.Node{}, nil
}

type mockTrigger struct {
capabilities.CapabilityInfo
}
Expand Down
18 changes: 10 additions & 8 deletions core/capabilities/transmission/local_target_capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,41 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

// LocalTargetCapability handles the transmission protocol required for a target capability that exists in the same don as
// the caller.
type LocalTargetCapability struct {
lggr logger.Logger
capabilities.TargetCapability
peerID p2ptypes.PeerID
don capabilities.DON
localNode capabilities.Node
}

func NewLocalTargetCapability(lggr logger.Logger, peerID p2ptypes.PeerID, don capabilities.DON, underlying capabilities.TargetCapability) *LocalTargetCapability {
func NewLocalTargetCapability(lggr logger.Logger, localDON capabilities.Node, underlying capabilities.TargetCapability) *LocalTargetCapability {
return &LocalTargetCapability{
TargetCapability: underlying,
lggr: lggr,
peerID: peerID,
don: don,
localNode: localDON,
}
}

func (l *LocalTargetCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
if l.localNode.PeerID == nil || l.localNode.WorkflowDON.ID == "" {
l.lggr.Debugf("empty DON info, executing immediately")
return l.TargetCapability.Execute(ctx, req)
}

if req.Config == nil || req.Config.Underlying["schedule"] == nil {
l.lggr.Debug("no schedule found, executing immediately")
return l.TargetCapability.Execute(ctx, req)
}

peerIDToTransmissionDelay, err := GetPeerIDToTransmissionDelay(l.don.Members, req)
peerIDToTransmissionDelay, err := GetPeerIDToTransmissionDelay(l.localNode.WorkflowDON.Members, req)
if err != nil {
return nil, fmt.Errorf("failed to get peer ID to transmission delay map: %w", err)
}

delay, existsForPeerID := peerIDToTransmissionDelay[l.peerID]
delay, existsForPeerID := peerIDToTransmissionDelay[*l.localNode.PeerID]
if !existsForPeerID {
return nil, nil
}
Expand Down
11 changes: 7 additions & 4 deletions core/capabilities/transmission/local_target_capability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,14 @@ func TestScheduledExecutionStrategy_LocalDON(t *testing.T) {
randKey(),
randKey(),
}
don := capabilities.DON{
Members: ids,
localDON := capabilities.Node{
WorkflowDON: capabilities.DON{
ID: "1",
Members: ids,
},
PeerID: &ids[tc.position],
}
peerID := ids[tc.position]
localTargetCapability := NewLocalTargetCapability(log, peerID, don, mt)
localTargetCapability := NewLocalTargetCapability(log, localDON, mt)

_, err = localTargetCapability.Execute(tests.Context(t), req)

Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1212,8 +1212,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8=
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb h1:R4OkRLPz6mZm8k7JFfLpQ9Ib/e1n1qcxg+hVxc0pKOk=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458 h1:+7LQmbMNaLXej+0ajbTxUfTt4w/ILODpmrOETQ5rTCI=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
12 changes: 4 additions & 8 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"

pkgcapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
Expand Down Expand Up @@ -205,6 +206,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
}

var externalPeerWrapper p2ptypes.PeerWrapper
var getLocalNode func(ctx context.Context) (pkgcapabilities.Node, error)
if cfg.Capabilities().Peering().Enabled() {
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger)
signer := externalPeer
Expand Down Expand Up @@ -239,6 +241,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("could not configure syncer: %w", err)
}

getLocalNode = registrySyncer.LocalNode
srvcs = append(srvcs, dispatcher, registrySyncer)
}

Expand Down Expand Up @@ -430,14 +433,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
globalLogger,
opts.CapabilitiesRegistry,
workflowORM,
func() *p2ptypes.PeerID {
if externalPeerWrapper == nil {
return nil
}

peerID := externalPeerWrapper.GetPeer().ID()
return &peerID
},
getLocalNode,
)

// Flux monitor requires ethereum just to boot, silence errors with a null delegate
Expand Down
Loading

0 comments on commit f296311

Please sign in to comment.