Skip to content

Commit

Permalink
fix(workflow/fetcher): validates response before use (#15921)
Browse files Browse the repository at this point in the history
* fix(workflow/fetcher): validates response before use

* feat(capabilities): add Validate method on Response

* feat(workflows/syncer): use Validate in Fetch and test

* refactor(workflows/syncer): simplifies getWorkflowArtifacts

* refactor(workflow/fetcher): lint fixes and clean up
  • Loading branch information
MStreet3 authored Jan 16, 2025
1 parent ac3a55f commit 415343f
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 40 deletions.
5 changes: 5 additions & 0 deletions .changeset/cyan-ladybugs-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

validates response from gateway in workflow/fetcher
21 changes: 21 additions & 0 deletions core/services/gateway/handlers/capabilities/webapi.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package capabilities

import "errors"

type Request struct {
URL string `json:"url"` // URL to query, only http and https protocols are supported.
Method string `json:"method,omitempty"` // HTTP verb, defaults to GET.
Expand All @@ -16,6 +18,25 @@ type Response struct {
Body []byte `json:"body,omitempty"` // HTTP response body
}

// Validate ensures the Response struct is consistent.
func (r Response) Validate() error {
if r.ExecutionError {
if r.ErrorMessage == "" {
return errors.New("executionError is true but errorMessage is empty")
}
if r.StatusCode != 0 || len(r.Headers) > 0 || len(r.Body) > 0 {
return errors.New("executionError is true but response details (statusCode, headers, body) are populated")
}
return nil
}

if r.StatusCode < 100 || r.StatusCode > 599 {
return errors.New("statusCode must be a valid HTTP status code (100-599)")
}

return nil
}

type TriggerResponsePayload struct {
ErrorMessage string `json:"error_message,omitempty"`
// ERROR, ACCEPTED, PENDING, COMPLETED
Expand Down
54 changes: 54 additions & 0 deletions core/services/gateway/handlers/capabilities/webapi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package capabilities

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestResponseValidate(t *testing.T) {
tt := []struct {
name string
response Response
expectError string
}{
{
name: "valid Response with ExecutionError",
response: Response{ExecutionError: true, ErrorMessage: "Some error"},
},
{
name: "invalid Response with ExecutionError but no ErrorMessage",
response: Response{ExecutionError: true},
expectError: "executionError is true but errorMessage is empty",
},
{
name: "valid HTTP Response",
response: Response{StatusCode: 200},
},
{
name: "invalid status code",
response: Response{
Body: []byte("body"),
},
expectError: "statusCode must be set when executionError is false",
},
{
name: "invalid HTTP Response with bad StatusCode",
response: Response{StatusCode: 700},
expectError: "statusCode must be a valid HTTP status code (100-599)",
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
err := tc.response.Validate()

if tc.expectError != "" {
require.Error(t, err)
return
}

require.NoError(t, err)
})
}
}
14 changes: 11 additions & 3 deletions core/services/workflows/syncer/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,21 @@ func (s *FetcherService) Fetch(ctx context.Context, url string) ([]byte, error)
return nil, err
}

s.lggr.Debugw("received gateway response")
if err = resp.Validate(); err != nil {
return nil, fmt.Errorf("invalid response from gateway: %w", err)
}

s.lggr.Debugw("received gateway response", "donID", resp.Body.DonId, "msgID", resp.Body.MessageId)

var payload ghcapabilities.Response
err = json.Unmarshal(resp.Body.Payload, &payload)
if err != nil {
if err = json.Unmarshal(resp.Body.Payload, &payload); err != nil {
return nil, err
}

if err = payload.Validate(); err != nil {
return nil, fmt.Errorf("invalid payload received from gateway message: %w", err)
}

if payload.ExecutionError {
return nil, fmt.Errorf("execution error from gateway: %s", payload.ErrorMessage)
}
Expand Down
115 changes: 100 additions & 15 deletions core/services/workflows/syncer/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package syncer

import (
"context"
"crypto/ecdsa"
"encoding/json"
"strings"
"testing"
Expand All @@ -11,10 +12,12 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/utils"
"github.com/smartcontractkit/chainlink/v2/core/utils/matches"
)

Expand All @@ -33,9 +36,11 @@ func TestNewFetcherService(t *testing.T) {
connector := gcmocks.NewGatewayConnector(t)
wrapper := &wrapper{c: connector}

url := "http://example.com"

msgID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, hash(url)}, "/")
var (
url = "http://example.com"
msgID = strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, hash(url)}, "/")
donID = "don-id"
)

t.Run("OK-valid_request", func(t *testing.T) {
connector.EXPECT().AddHandler([]string{capabilities.MethodWorkflowSyncer}, mock.Anything).Return(nil)
Expand All @@ -44,11 +49,11 @@ func TestNewFetcherService(t *testing.T) {
require.NoError(t, fetcher.Start(ctx))
defer fetcher.Close()

gatewayResp := gatewayResponse(t, msgID)
gatewayResp := signGatewayResponse(t, gatewayResponse(t, msgID, donID))
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Return(nil).Times(1)
connector.EXPECT().DonID().Return("don-id")
connector.EXPECT().DonID().Return(donID)
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

Expand All @@ -59,13 +64,51 @@ func TestNewFetcherService(t *testing.T) {
require.Equal(t, expectedPayload, payload)
})

t.Run("fails with invalid payload response", func(t *testing.T) {
connector.EXPECT().AddHandler([]string{capabilities.MethodWorkflowSyncer}, mock.Anything).Return(nil)

fetcher := NewFetcherService(lggr, wrapper)
require.NoError(t, fetcher.Start(ctx))
defer fetcher.Close()

gatewayResp := signGatewayResponse(t, inconsistentPayload(t, msgID, donID))
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Return(nil).Times(1)
connector.EXPECT().DonID().Return(donID)
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

_, err := fetcher.Fetch(ctx, url)
require.Error(t, err)
})

t.Run("fails due to invalid gateway response", func(t *testing.T) {
connector.EXPECT().AddHandler([]string{capabilities.MethodWorkflowSyncer}, mock.Anything).Return(nil)

fetcher := NewFetcherService(lggr, wrapper)
require.NoError(t, fetcher.Start(ctx))
defer fetcher.Close()

gatewayResp := gatewayResponse(t, msgID, donID) // gateway response that is not signed
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Return(nil).Times(1)
connector.EXPECT().DonID().Return(donID)
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

_, err := fetcher.Fetch(ctx, url)
require.Error(t, err)
require.ErrorContains(t, err, "invalid response from gateway")
})

t.Run("NOK-response_payload_too_large", func(t *testing.T) {
headers := map[string]string{"Content-Type": "application/json"}
responsePayload, err := json.Marshal(ghcapabilities.Response{
StatusCode: 400,
Headers: headers,
ErrorMessage: "http: request body too large",
ExecutionError: true,
StatusCode: 400,
Headers: headers,
ErrorMessage: "http: request body too large",
})
require.NoError(t, err)
gatewayResponse := &api.Message{
Expand All @@ -85,7 +128,7 @@ func TestNewFetcherService(t *testing.T) {
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResponse)
}).Return(nil).Times(1)
connector.EXPECT().DonID().Return("don-id")
connector.EXPECT().DonID().Return(donID)
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

Expand All @@ -94,21 +137,63 @@ func TestNewFetcherService(t *testing.T) {
})
}

func gatewayResponse(t *testing.T, msgID string) *api.Message {
// gatewayResponse creates an unsigned gateway response with a status code of 200 and a response body.
func gatewayResponse(t *testing.T, msgID string, donID string) *api.Message {
headers := map[string]string{"Content-Type": "application/json"}
body := []byte("response body")
responsePayload, err := json.Marshal(ghcapabilities.Response{
StatusCode: 200,
Headers: headers,
Body: body,
ExecutionError: false,
StatusCode: 200,
Headers: headers,
Body: body,
})
require.NoError(t, err)
return &api.Message{
Body: api.MessageBody{
MessageId: msgID,
DonId: donID,
Method: ghcapabilities.MethodWebAPITarget,
Payload: responsePayload,
},
}
}

// inconsistentPayload creates an unsigned gateway response with an inconsistent payload. The
// ExecutionError is true, but there is no ErrorMessage, so it is invalid.
func inconsistentPayload(t *testing.T, msgID string, donID string) *api.Message {
responsePayload, err := json.Marshal(ghcapabilities.Response{
ExecutionError: true,
})
require.NoError(t, err)
return &api.Message{
Body: api.MessageBody{
MessageId: msgID,
DonId: donID,
Method: ghcapabilities.MethodWebAPITarget,
Payload: responsePayload,
},
}
}

// signGatewayResponse signs the gateway response with a private key and arbitrarily sets the receiver
// to the signer's address. A signature and receiver are required for a valid gateway response.
func signGatewayResponse(t *testing.T, msg *api.Message) *api.Message {
nodeKeys := common.NewTestNodes(t, 1)
s := &signer{pk: nodeKeys[0].PrivateKey}
signature, err := s.Sign(api.GetRawMessageBody(&msg.Body)...)
require.NoError(t, err)
msg.Signature = utils.StringToHex(string(signature))

signerBytes, err := msg.ExtractSigner()
require.NoError(t, err)

msg.Body.Receiver = utils.StringToHex(string(signerBytes))
return msg
}

type signer struct {
pk *ecdsa.PrivateKey
}

func (s *signer) Sign(data ...[]byte) ([]byte, error) {
return common.SignData(s.pk, data...)
}
45 changes: 23 additions & 22 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,34 +495,35 @@ func (h *eventHandler) getWorkflowArtifacts(
ctx context.Context,
payload WorkflowRegistryWorkflowRegisteredV1,
) ([]byte, []byte, error) {
spec, err := h.orm.GetWorkflowSpecByID(ctx, hex.EncodeToString(payload.WorkflowID[:]))
if err != nil {
binary, err2 := h.fetcher(ctx, payload.BinaryURL)
if err2 != nil {
return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err2)
// Check if the workflow spec is already stored in the database
if spec, err := h.orm.GetWorkflowSpecByID(ctx, hex.EncodeToString(payload.WorkflowID[:])); err == nil {
// there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts
decodedBinary, err := hex.DecodeString(spec.Workflow)
if err != nil {
return nil, nil, fmt.Errorf("failed to decode stored workflow spec: %w", err)
}
return decodedBinary, []byte(spec.Config), nil
}

decodedBinary, err2 := base64.StdEncoding.DecodeString(string(binary))
if err2 != nil {
return nil, nil, fmt.Errorf("failed to decode binary: %w", err2)
}
// Fetch the binary and config files from the specified URLs.
var (
binary, decodedBinary, config []byte
err error
)
if binary, err = h.fetcher(ctx, payload.BinaryURL); err != nil {
return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err)
}

var config []byte
if payload.ConfigURL != "" {
config, err2 = h.fetcher(ctx, payload.ConfigURL)
if err2 != nil {
return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err2)
}
}
return decodedBinary, config, nil
if decodedBinary, err = base64.StdEncoding.DecodeString(string(binary)); err != nil {
return nil, nil, fmt.Errorf("failed to decode binary: %w", err)
}

// there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts
decodedBinary, err := hex.DecodeString(spec.Workflow)
if err != nil {
return nil, nil, fmt.Errorf("failed to decode stored workflow spec: %w", err)
if payload.ConfigURL != "" {
if config, err = h.fetcher(ctx, payload.ConfigURL); err != nil {
return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
}
}
return decodedBinary, []byte(spec.Config), nil
return decodedBinary, config, nil
}

func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
Expand Down

0 comments on commit 415343f

Please sign in to comment.