-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cappl 391 wire in workflow registry #15460
Merged
Merged
Changes from 8 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
22ddff9
don notifier added
ettec 4b793dd
wip
ettec 48955c6
wip
ettec d40f112
wire up of wf syncer
ettec 0237781
test udpate
ettec 13578ce
srvcs fix
ettec a53ae7b
review comments
ettec 5363bb1
lint
ettec 7b74258
attempt to fix ci tests
ettec File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package capabilities | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
) | ||
|
||
type DonNotifier struct { | ||
mu sync.Mutex | ||
don capabilities.DON | ||
notified bool | ||
ch chan struct{} | ||
} | ||
|
||
func NewDonNotifier() *DonNotifier { | ||
return &DonNotifier{ | ||
ch: make(chan struct{}), | ||
} | ||
} | ||
|
||
func (n *DonNotifier) NotifyDonSet(don capabilities.DON) { | ||
n.mu.Lock() | ||
defer n.mu.Unlock() | ||
if !n.notified { | ||
n.don = don | ||
n.notified = true | ||
close(n.ch) | ||
} | ||
} | ||
|
||
func (n *DonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) { | ||
select { | ||
case <-ctx.Done(): | ||
return capabilities.DON{}, ctx.Err() | ||
case <-n.ch: | ||
} | ||
<-n.ch | ||
n.mu.Lock() | ||
defer n.mu.Unlock() | ||
return n.don, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package capabilities_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/capabilities" | ||
) | ||
|
||
func TestDonNotifier_WaitForDon(t *testing.T) { | ||
notifier := capabilities.NewDonNotifier() | ||
don := commoncap.DON{ | ||
ID: 1, | ||
} | ||
|
||
go func() { | ||
time.Sleep(100 * time.Millisecond) | ||
notifier.NotifyDonSet(don) | ||
}() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | ||
defer cancel() | ||
|
||
result, err := notifier.WaitForDon(ctx) | ||
require.NoError(t, err) | ||
assert.Equal(t, don, result) | ||
|
||
result, err = notifier.WaitForDon(ctx) | ||
require.NoError(t, err) | ||
assert.Equal(t, don, result) | ||
} | ||
|
||
func TestDonNotifier_WaitForDon_ContextTimeout(t *testing.T) { | ||
notifier := capabilities.NewDonNotifier() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) | ||
defer cancel() | ||
|
||
_, err := notifier.WaitForDon(ctx) | ||
require.Error(t, err) | ||
assert.Equal(t, context.DeadlineExceeded, err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"go.uber.org/multierr" | ||
"go.uber.org/zap/zapcore" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/custmsg" | ||
"github.com/smartcontractkit/chainlink-common/pkg/loop" | ||
commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" | ||
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil" | ||
|
@@ -33,6 +34,7 @@ import ( | |
gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" | ||
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" | ||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" | ||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" | ||
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" | ||
|
@@ -48,6 +50,8 @@ import ( | |
"github.com/smartcontractkit/chainlink/v2/core/services/feeds" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/gateway" | ||
capabilities2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" | ||
common2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/headreporter" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/job" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/keeper" | ||
|
@@ -71,6 +75,7 @@ import ( | |
"github.com/smartcontractkit/chainlink/v2/core/services/webhook" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/workflows" | ||
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" | ||
"github.com/smartcontractkit/chainlink/v2/core/sessions" | ||
"github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth" | ||
"github.com/smartcontractkit/chainlink/v2/core/sessions/localauth" | ||
|
@@ -212,6 +217,17 @@ func NewApplication(opts ApplicationOpts) (Application, error) { | |
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger) | ||
} | ||
|
||
var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper | ||
if cfg.Capabilities().GatewayConnector().DonID() != "" { | ||
globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID()) | ||
gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper( | ||
cfg.Capabilities().GatewayConnector(), | ||
keyStore.Eth(), | ||
clockwork.NewRealClock(), | ||
globalLogger) | ||
srvcs = append(srvcs, gatewayConnectorWrapper) | ||
} | ||
|
||
var externalPeerWrapper p2ptypes.PeerWrapper | ||
if cfg.Capabilities().Peering().Enabled() { | ||
var dispatcher remotetypes.Dispatcher | ||
|
@@ -256,32 +272,79 @@ func NewApplication(opts ApplicationOpts) (Application, error) { | |
return nil, fmt.Errorf("could not configure syncer: %w", err) | ||
} | ||
|
||
workflowDonNotifier := capabilities.NewDonNotifier() | ||
|
||
wfLauncher := capabilities.NewLauncher( | ||
globalLogger, | ||
externalPeerWrapper, | ||
dispatcher, | ||
opts.CapabilitiesRegistry, | ||
workflowDonNotifier, | ||
) | ||
registrySyncer.AddLauncher(wfLauncher) | ||
|
||
srvcs = append(srvcs, wfLauncher, registrySyncer) | ||
|
||
if cfg.Capabilities().WorkflowRegistry().Address() != "" { | ||
if gatewayConnectorWrapper == nil { | ||
return nil, errors.New("unable to create workflow registry syncer without gateway connector") | ||
} | ||
|
||
err = keyStore.Workflow().EnsureKey(context.Background()) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to ensure workflow key: %w", err) | ||
} | ||
|
||
keys, err := keyStore.Workflow().GetAll() | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to get all workflow keys: %w", err) | ||
} | ||
if len(keys) != 1 { | ||
return nil, fmt.Errorf("expected 1 key, got %d", len(keys)) | ||
} | ||
|
||
connector := gatewayConnectorWrapper.GetGatewayConnector() | ||
webAPILggr := globalLogger.Named("WebAPITarget") | ||
|
||
webAPIConfig := webapi.ServiceConfig{ | ||
RateLimiter: common2.RateLimiterConfig{ | ||
GlobalRPS: 100.0, | ||
GlobalBurst: 100, | ||
PerSenderRPS: 100.0, | ||
PerSenderBurst: 100, | ||
}, | ||
} | ||
|
||
outgoingConnectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, | ||
webAPIConfig, | ||
capabilities2.MethodWebAPITarget, webAPILggr) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not create outgoing connector handler: %w", err) | ||
} | ||
|
||
eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger), | ||
syncer.NewFetcherFunc(globalLogger, outgoingConnectorHandler), workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()), opts.CapabilitiesRegistry, | ||
custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0]) | ||
|
||
loader := syncer.NewWorkflowRegistryContractLoader(cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { | ||
return relayer.NewContractReader(ctx, bytes) | ||
}, eventHandler) | ||
|
||
wfSyncer := syncer.NewWorkflowRegistry(globalLogger, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { | ||
return relayer.NewContractReader(ctx, bytes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same with this |
||
}, cfg.Capabilities().WorkflowRegistry().Address(), | ||
syncer.WorkflowEventPollerConfig{ | ||
QueryCount: 100, | ||
}, eventHandler, loader, workflowDonNotifier) | ||
|
||
srvcs = append(srvcs, wfSyncer) | ||
} | ||
} | ||
} else { | ||
globalLogger.Debug("External registry not configured, skipping registry syncer and starting with an empty registry") | ||
opts.CapabilitiesRegistry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) | ||
} | ||
|
||
var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper | ||
if cfg.Capabilities().GatewayConnector().DonID() != "" { | ||
globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID()) | ||
gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper( | ||
cfg.Capabilities().GatewayConnector(), | ||
keyStore.Eth(), | ||
clockwork.NewRealClock(), | ||
globalLogger) | ||
srvcs = append(srvcs, gatewayConnectorWrapper) | ||
} | ||
|
||
// LOOPs can be created as options, in the case of LOOP relayers, or | ||
// as OCR2 job implementations, in the case of Median today. | ||
// We will have a non-nil registry here in LOOP relayers are being used, otherwise | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simplify this to just relayer.NewContractReader I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it needs to be the factory method, internally the workflow registry create this on demand am assuming for good reason