Skip to content

Commit

Permalink
Core changes for changing Execute Capability API to sync (#14317)
Browse files Browse the repository at this point in the history
* changes required to move capability execute api from async channel to sync

* lint
  • Loading branch information
ettec authored Sep 6, 2024
1 parent 792d85d commit 72f4cc8
Showing 26 changed files with 116 additions and 166 deletions.
5 changes: 5 additions & 0 deletions .changeset/cold-suns-hope.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal changes required for capability api chance to sync
4 changes: 2 additions & 2 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
@@ -46,8 +46,8 @@ type mockCapability struct {
capabilities.CapabilityInfo
}

func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
return nil, nil
func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
return capabilities.CapabilityResponse{}, nil
}

func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
4 changes: 2 additions & 2 deletions core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
@@ -21,8 +21,8 @@ type mockCapability struct {
capabilities.CapabilityInfo
}

func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
return nil, nil
func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
return capabilities.CapabilityResponse{}, nil
}

func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
15 changes: 12 additions & 3 deletions core/capabilities/remote/target/client.go
Original file line number Diff line number Diff line change
@@ -123,7 +123,17 @@ func (c *client) UnregisterFromWorkflow(ctx context.Context, request commoncap.U
return nil
}

func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
req, err := c.executeRequest(ctx, capReq)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to execute request: %w", err)
}

resp := <-req.ResponseChan()
return resp.CapabilityResponse, resp.Err
}

func (c *client) executeRequest(ctx context.Context, capReq commoncap.CapabilityRequest) (*request.ClientRequest, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

@@ -145,8 +155,7 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
}

c.messageIDToCallerRequest[messageID] = req

return req.ResponseChan(), nil
return req, nil
}

func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
17 changes: 6 additions & 11 deletions core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
@@ -35,9 +35,8 @@ func Test_Client_DonTopologies(t *testing.T) {
})
require.NoError(t, err)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
@@ -66,9 +65,8 @@ func Test_Client_DonTopologies(t *testing.T) {
func Test_Client_TransmissionSchedules(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
@@ -104,10 +102,8 @@ func Test_Client_TransmissionSchedules(t *testing.T) {
func Test_Client_TimesOutIfInsufficientCapabilityPeerResponses(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
assert.NotNil(t, response.Err)
responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
assert.NotNil(t, responseError)
}

capability := &TestCapability{}
@@ -126,7 +122,7 @@ func Test_Client_TimesOutIfInsufficientCapabilityPeerResponses(t *testing.T) {

func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout time.Duration,
numCapabilityPeers int, capabilityDonF uint8, underlying commoncap.TargetCapability, transmissionSchedule *values.Map,
responseTest func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error)) {
responseTest func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error)) {
lggr := logger.TestLogger(t)

capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers)
@@ -261,8 +257,7 @@ func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBo
panic(err)
}

respCh, responseErr := t.targetCapability.Execute(context.Background(), capabilityRequest)
resp := <-respCh
resp, responseErr := t.targetCapability.Execute(context.Background(), capabilityRequest)

for receiver := range t.messageIDToSenders[messageID] {
var responseMsg = &remotetypes.MessageBody{
57 changes: 21 additions & 36 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
@@ -29,10 +29,8 @@ import (
func Test_RemoteTargetCapability_InsufficientCapabilityResponses(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
assert.NotNil(t, response.Err)
responseTest := func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.NotNil(t, responseError)
}

capability := &TestCapability{}
@@ -49,10 +47,8 @@ func Test_RemoteTargetCapability_InsufficientCapabilityResponses(t *testing.T) {
func Test_RemoteTargetCapability_InsufficientWorkflowRequests(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
assert.NotNil(t, response.Err)
responseTest := func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.NotNil(t, responseError)
}

timeOut := 10 * time.Minute
@@ -71,9 +67,8 @@ func Test_RemoteTargetCapability_InsufficientWorkflowRequests(t *testing.T) {
func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
@@ -103,9 +98,8 @@ func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) {
func Test_RemoteTargetCapability_DonTopologies(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
mp, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", mp.(map[string]any)["response"].(string))
@@ -138,10 +132,8 @@ func Test_RemoteTargetCapability_DonTopologies(t *testing.T) {
func Test_RemoteTargetCapability_CapabilityError(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
assert.Equal(t, "failed to execute capability: an error", response.Err.Error())
responseTest := func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.Equal(t, "failed to execute capability: an error", responseError.Error())
}

capability := &TestErrorCapability{}
@@ -158,10 +150,8 @@ func Test_RemoteTargetCapability_CapabilityError(t *testing.T) {
func Test_RemoteTargetCapability_RandomCapabilityError(t *testing.T) {
ctx := testutils.Context(t)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
assert.Equal(t, "request expired", response.Err.Error())
responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
assert.Equal(t, "request expired", responseError.Error())
}

capability := &TestRandomErrorCapability{}
@@ -177,7 +167,7 @@ func Test_RemoteTargetCapability_RandomCapabilityError(t *testing.T) {

func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.TargetCapability, numWorkflowPeers int, workflowDonF uint8, workflowNodeTimeout time.Duration,
numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration, transmissionSchedule *values.Map,
responseTest func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error)) {
responseTest func(t *testing.T, response commoncap.CapabilityResponse, responseError error)) {
lggr := logger.TestLogger(t)

capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers)
@@ -258,7 +248,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
for _, caller := range workflowNodes {
go func(caller commoncap.TargetCapability) {
defer wg.Done()
responseCh, err := caller.Execute(ctx,
response, err := caller.Execute(ctx,
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
@@ -268,7 +258,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
Inputs: executeInputs,
})

responseTest(t, responseCh, err)
responseTest(t, response, err)
}(caller)
}

@@ -404,36 +394,31 @@ type TestCapability struct {
abstractTestCapability
}

func (t TestCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
ch := make(chan commoncap.CapabilityResponse, 1)

func (t TestCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
value := request.Inputs.Underlying["executeValue1"]

response, err := values.NewMap(map[string]any{"response": value})
if err != nil {
return nil, err
return commoncap.CapabilityResponse{}, err
}
ch <- commoncap.CapabilityResponse{
return commoncap.CapabilityResponse{
Value: response,
}

return ch, nil
}, nil
}

type TestErrorCapability struct {
abstractTestCapability
}

func (t TestErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
return nil, errors.New("an error")
func (t TestErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
return commoncap.CapabilityResponse{}, errors.New("an error")
}

type TestRandomErrorCapability struct {
abstractTestCapability
}

func (t TestRandomErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
return nil, errors.New(uuid.New().String())
func (t TestRandomErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
return commoncap.CapabilityResponse{}, errors.New(uuid.New().String())
}

func NewP2PPeerID(t *testing.T) p2ptypes.PeerID {
22 changes: 13 additions & 9 deletions core/capabilities/remote/target/request/client_request.go
Original file line number Diff line number Diff line change
@@ -22,9 +22,14 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

type asyncCapabilityResponse struct {
capabilities.CapabilityResponse
Err error
}

type ClientRequest struct {
cancelFn context.CancelFunc
responseCh chan commoncap.CapabilityResponse
responseCh chan asyncCapabilityResponse
createdAt time.Time
responseIDCount map[[32]byte]int
errorCount map[string]int
@@ -101,13 +106,13 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap
responseIDCount: make(map[[32]byte]int),
errorCount: make(map[string]int),
responseReceived: responseReceived,
responseCh: make(chan commoncap.CapabilityResponse, 1),
responseCh: make(chan asyncCapabilityResponse, 1),
wg: wg,
lggr: lggr,
}, nil
}

func (c *ClientRequest) ResponseChan() <-chan commoncap.CapabilityResponse {
func (c *ClientRequest) ResponseChan() <-chan asyncCapabilityResponse {
return c.responseCh
}

@@ -121,11 +126,10 @@ func (c *ClientRequest) Cancel(err error) {
c.mux.Lock()
defer c.mux.Unlock()
if !c.respSent {
c.sendResponse(commoncap.CapabilityResponse{Err: err})
c.sendResponse(asyncCapabilityResponse{Err: err})
}
}

// TODO OnMessage assumes that only one response is received from each peer, if streaming responses need to be supported this will need to be updated
func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) error {
c.mux.Lock()
defer c.mux.Unlock()
@@ -167,22 +171,22 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
if c.responseIDCount[responseID] == c.requiredIdenticalResponses {
capabilityResponse, err := pb.UnmarshalCapabilityResponse(msg.Payload)
if err != nil {
c.sendResponse(commoncap.CapabilityResponse{Err: fmt.Errorf("failed to unmarshal capability response: %w", err)})
c.sendResponse(asyncCapabilityResponse{Err: fmt.Errorf("failed to unmarshal capability response: %w", err)})
} else {
c.sendResponse(commoncap.CapabilityResponse{Value: capabilityResponse.Value})
c.sendResponse(asyncCapabilityResponse{CapabilityResponse: commoncap.CapabilityResponse{Value: capabilityResponse.Value}})
}
}
} else {
c.lggr.Warnw("received error response", "error", remote.SanitizeLogString(msg.ErrorMsg))
c.errorCount[msg.ErrorMsg]++
if c.errorCount[msg.ErrorMsg] == c.requiredIdenticalResponses {
c.sendResponse(commoncap.CapabilityResponse{Err: errors.New(msg.ErrorMsg)})
c.sendResponse(asyncCapabilityResponse{Err: errors.New(msg.ErrorMsg)})
}
}
return nil
}

func (c *ClientRequest) sendResponse(response commoncap.CapabilityResponse) {
func (c *ClientRequest) sendResponse(response asyncCapabilityResponse) {
c.responseCh <- response
close(c.responseCh)
c.respSent = true
Original file line number Diff line number Diff line change
@@ -84,7 +84,6 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
require.NoError(t, err)
capabilityResponse := commoncap.CapabilityResponse{
Value: m,
Err: nil,
}

rawResponse, err := pb.MarshalCapabilityResponse(capabilityResponse)
@@ -117,7 +116,6 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
require.NoError(t, err)
capabilityResponse2 := commoncap.CapabilityResponse{
Value: nm,
Err: nil,
}

rawResponse2, err := pb.MarshalCapabilityResponse(capabilityResponse2)
7 changes: 3 additions & 4 deletions core/capabilities/remote/target/request/server_request.go
Original file line number Diff line number Diff line change
@@ -125,20 +125,19 @@ func (e *ServerRequest) executeRequest(ctx context.Context, payload []byte) erro
}

e.lggr.Debugw("executing capability", "metadata", capabilityRequest.Metadata)
capResponseCh, err := e.capability.Execute(ctxWithTimeout, capabilityRequest)
capResponse, err := e.capability.Execute(ctxWithTimeout, capabilityRequest)

if err != nil {
e.lggr.Debugw("received execution error", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", err)
return fmt.Errorf("failed to execute capability: %w", err)
}

// NOTE working on the assumption that the capability will only ever return one response from its channel
capResponse := <-capResponseCh
responsePayload, err := pb.MarshalCapabilityResponse(capResponse)
if err != nil {
return fmt.Errorf("failed to marshal capability response: %w", err)
}

e.lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", capResponse.Err)
e.lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID)
e.setResult(responsePayload)
return nil
}
Loading

0 comments on commit 72f4cc8

Please sign in to comment.