Skip to content

Commit

Permalink
Action Remote Shims Register/Unregister (#15232)
Browse files Browse the repository at this point in the history
* execute capability shims

* shims

* common bump

* review comment

* common bump

* step ref

* build
  • Loading branch information
ettec authored Nov 14, 2024
1 parent b805b82 commit c015da8
Show file tree
Hide file tree
Showing 27 changed files with 1,438 additions and 601 deletions.
58 changes: 52 additions & 6 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/streams"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -294,12 +294,26 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
return fmt.Errorf("failed to add trigger shim: %w", err)
}
case capabilities.CapabilityTypeAction:
w.lggr.Warn("no remote client configured for capability type action, skipping configuration")
newActionFn := func(info capabilities.CapabilityInfo) (capabilityService, error) {
client := executable.NewClient(
info,
myDON.DON,
w.dispatcher,
defaultTargetRequestTimeout,
w.lggr,
)
return client, nil
}

err := w.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newActionFn)
if err != nil {
return fmt.Errorf("failed to add action shim: %w", err)
}
case capabilities.CapabilityTypeConsensus:
w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration")
case capabilities.CapabilityTypeTarget:
newTargetFn := func(info capabilities.CapabilityInfo) (capabilityService, error) {
client := target.NewClient(
client := executable.NewClient(
info,
myDON.DON,
w.dispatcher,
Expand Down Expand Up @@ -419,7 +433,34 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
// continue attempting other capabilities
}
case capabilities.CapabilityTypeAction:
w.lggr.Warn("no remote client configured for capability type action, skipping configuration")
newActionServer := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
actionCapability, ok := (cap).(capabilities.ActionCapability)
if !ok {
return nil, errors.New("capability does not implement ActionCapability")
}

remoteConfig := &capabilities.RemoteExecutableConfig{}
if capabilityConfig.RemoteTargetConfig != nil {
remoteConfig.RequestHashExcludedAttributes = capabilityConfig.RemoteTargetConfig.RequestHashExcludedAttributes
}

return executable.NewServer(
capabilityConfig.RemoteExecutableConfig,
myPeerID,
actionCapability,
info,
don.DON,
idsToDONs,
w.dispatcher,
defaultTargetRequestTimeout,
w.lggr,
), nil
}

err = w.addReceiver(ctx, capability, don, newActionServer)
if err != nil {
return fmt.Errorf("failed to add action server-side receiver: %w", err)
}
case capabilities.CapabilityTypeConsensus:
w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration")
case capabilities.CapabilityTypeTarget:
Expand All @@ -429,8 +470,13 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
return nil, errors.New("capability does not implement TargetCapability")
}

return target.NewServer(
capabilityConfig.RemoteTargetConfig,
remoteConfig := &capabilities.RemoteExecutableConfig{}
if capabilityConfig.RemoteTargetConfig != nil {
remoteConfig.RequestHashExcludedAttributes = capabilityConfig.RemoteTargetConfig.RequestHashExcludedAttributes
}

return executable.NewServer(
remoteConfig,
myPeerID,
targetCapability,
info,
Expand Down
7 changes: 4 additions & 3 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestLauncher(t *testing.T) {
)

dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*target.server")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*executable.server")).Return(nil)

err = launcher.Launch(ctx, state)
require.NoError(t, err)
Expand Down Expand Up @@ -603,7 +603,8 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) {
)

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*target.client")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*executable.client")).Return(nil)
dispatcher.On("Ready").Return(nil).Maybe()
awaitRegistrationMessageCh := make(chan struct{})
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
select {
Expand Down Expand Up @@ -919,7 +920,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) {
)

dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*target.client")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*executable.client")).Return(nil)

err = launcher.Launch(ctx, state)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package target
package executable

import (
"context"
Expand All @@ -8,15 +8,15 @@ import (
"time"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// client is a shim for remote target capabilities.
// client is a shim for remote executable capabilities.
// It translates between capability API calls and network messages.
// Its responsibilities are:
// 1. Transmit capability requests to remote nodes according to a transmission schedule
Expand All @@ -31,25 +31,25 @@ type client struct {
dispatcher types.Dispatcher
requestTimeout time.Duration

messageIDToCallerRequest map[string]*request.ClientRequest
requestIDToCallerRequest map[string]*request.ClientRequest
mutex sync.Mutex
stopCh services.StopChan
wg sync.WaitGroup
}

var _ commoncap.TargetCapability = &client{}
var _ commoncap.ExecutableCapability = &client{}
var _ types.Receiver = &client{}
var _ services.Service = &client{}

func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration, lggr logger.Logger) *client {
return &client{
lggr: lggr.Named("TargetClient"),
lggr: lggr.Named("ExecutableCapabilityClient"),
remoteCapabilityInfo: remoteCapabilityInfo,
localDONInfo: localDonInfo,
dispatcher: dispatcher,
requestTimeout: requestTimeout,
messageIDToCallerRequest: make(map[string]*request.ClientRequest),
requestIDToCallerRequest: make(map[string]*request.ClientRequest),
stopCh: make(services.StopChan),
}
}
Expand All @@ -61,7 +61,13 @@ func (c *client) Start(ctx context.Context) error {
defer c.wg.Done()
c.checkForExpiredRequests()
}()
c.lggr.Info("TargetClient started")
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.checkDispatcherReady()
}()

c.lggr.Info("ExecutableCapability Client started")
return nil
})
}
Expand All @@ -71,11 +77,26 @@ func (c *client) Close() error {
close(c.stopCh)
c.cancelAllRequests(errors.New("client closed"))
c.wg.Wait()
c.lggr.Info("TargetClient closed")
c.lggr.Info("ExecutableCapability closed")
return nil
})
}

func (c *client) checkDispatcherReady() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
if err := c.dispatcher.Ready(); err != nil {
c.cancelAllRequests(fmt.Errorf("dispatcher not ready: %w", err))
}
}
}
}

func (c *client) checkForExpiredRequests() {
ticker := time.NewTicker(c.requestTimeout)
defer ticker.Stop()
Expand All @@ -93,18 +114,23 @@ func (c *client) expireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()

for messageID, req := range c.messageIDToCallerRequest {
for messageID, req := range c.requestIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired"))
delete(c.messageIDToCallerRequest, messageID)
delete(c.requestIDToCallerRequest, messageID)
}

if c.dispatcher.Ready() != nil {
c.cancelAllRequests(errors.New("dispatcher not ready"))
return
}
}
}

func (c *client) cancelAllRequests(err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
for _, req := range c.messageIDToCallerRequest {
for _, req := range c.requestIDToCallerRequest {
req.Cancel(err)
}
}
Expand All @@ -113,49 +139,80 @@ func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
}

func (c *client) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error {
// do nothing
return nil
}
func (c *client) RegisterToWorkflow(ctx context.Context, registerRequest commoncap.RegisterToWorkflowRequest) error {
req, err := request.NewClientRegisterToWorkflowRequest(ctx, c.lggr, registerRequest, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)

if err != nil {
return fmt.Errorf("failed to create client request: %w", err)
}

if err = c.sendRequest(req); err != nil {
return fmt.Errorf("failed to send request: %w", err)
}

func (c *client) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error {
// do nothing
resp := <-req.ResponseChan()
if resp.Err != nil {
return fmt.Errorf("error executing request: %w", resp.Err)
}
return nil
}

func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
req, err := c.executeRequest(ctx, capReq)
func (c *client) UnregisterFromWorkflow(ctx context.Context, unregisterRequest commoncap.UnregisterFromWorkflowRequest) error {
req, err := request.NewClientUnregisterFromWorkflowRequest(ctx, c.lggr, unregisterRequest, c.remoteCapabilityInfo,
c.localDONInfo, c.dispatcher, c.requestTimeout)

if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to execute request: %w", err)
return fmt.Errorf("failed to create client request: %w", err)
}

if err = c.sendRequest(req); err != nil {
return fmt.Errorf("failed to send request: %w", err)
}

resp := <-req.ResponseChan()
return resp.CapabilityResponse, resp.Err
if resp.Err != nil {
return fmt.Errorf("error executing request: %w", resp.Err)
}
return nil
}

func (c *client) executeRequest(ctx context.Context, capReq commoncap.CapabilityRequest) (*request.ClientRequest, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

messageID, err := GetMessageIDForRequest(capReq)
func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)
if err != nil {
return nil, fmt.Errorf("failed to get message ID for request: %w", err)
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to create client request: %w", err)
}

c.lggr.Debugw("executing remote target", "messageID", messageID)
if err = c.sendRequest(req); err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to send request: %w", err)
}

if _, ok := c.messageIDToCallerRequest[messageID]; ok {
return nil, fmt.Errorf("request for message ID %s already exists", messageID)
resp := <-req.ResponseChan()
if resp.Err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("error executing request: %w", resp.Err)
}

req, err := request.NewClientRequest(ctx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)
capabilityResponse, err := pb.UnmarshalCapabilityResponse(resp.Result)
if err != nil {
return nil, fmt.Errorf("failed to create client request: %w", err)
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to unmarshal capability response: %w", err)
}

return capabilityResponse, nil
}

func (c *client) sendRequest(req *request.ClientRequest) error {
c.mutex.Lock()
defer c.mutex.Unlock()

c.lggr.Debugw("executing remote execute capability", "requestID", req.ID())

if _, ok := c.requestIDToCallerRequest[req.ID()]; ok {
return fmt.Errorf("request for ID %s already exists", req.ID())
}

c.messageIDToCallerRequest[messageID] = req
return req, nil
c.requestIDToCallerRequest[req.ID()] = req
return nil
}

func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
Expand All @@ -168,9 +225,9 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
return
}

c.lggr.Debugw("Remote client target receiving message", "messageID", messageID)
c.lggr.Debugw("Remote client executable receiving message", "messageID", messageID)

req := c.messageIDToCallerRequest[messageID]
req := c.requestIDToCallerRequest[messageID]
if req == nil {
c.lggr.Warnw("received response for unknown message ID ", "messageID", messageID)
return
Expand All @@ -181,18 +238,6 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
}
}

func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) {
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil {
return "", fmt.Errorf("workflow ID is invalid: %w", err)
}

if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID); err != nil {
return "", fmt.Errorf("workflow execution ID is invalid: %w", err)
}

return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil
}

func (c *client) Ready() error {
return nil
}
Expand Down
Loading

0 comments on commit c015da8

Please sign in to comment.