Skip to content

Commit

Permalink
[KS-421] Improve logging from remote capabilities (#14058)
Browse files Browse the repository at this point in the history
Add validations and sanitizing
  • Loading branch information
bolekk authored Aug 9, 2024
1 parent 349778b commit 95cb692
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 34 deletions.
2 changes: 1 addition & 1 deletion core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (d *dispatcher) receive() {
receiver, ok := d.receivers[k]
d.mu.RUnlock()
if !ok {
d.lggr.Debugw("received message for unregistered capability", "capabilityId", k.capId, "donId", k.donId)
d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capId), "donId", k.donId)
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
continue
}
Expand Down
11 changes: 8 additions & 3 deletions core/capabilities/remote/target/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -151,7 +152,11 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()

messageID := GetMessageID(msg)
messageID, err := GetMessageID(msg)
if err != nil {
c.lggr.Errorw("invalid message ID", "err", err, "id", remote.SanitizeLogString(string(msg.MessageId)))
return
}

c.lggr.Debugw("Remote client target receiving message", "messageID", messageID)

Expand All @@ -167,8 +172,8 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
}

func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) {
if req.Metadata.WorkflowID == "" || req.Metadata.WorkflowExecutionID == "" {
return "", errors.New("workflow ID and workflow execution ID must be set in request metadata")
if !remote.IsValidWorkflowOrExecutionID(req.Metadata.WorkflowID) || !remote.IsValidWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID) {
return "", errors.New("workflow ID and workflow execution ID in request metadata are invalid")
}

return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil
Expand Down
14 changes: 11 additions & 3 deletions core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

const (
workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed"
)

func Test_Client_DonTopologies(t *testing.T) {
ctx := testutils.Context(t)

Expand Down Expand Up @@ -192,8 +197,8 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
responseCh, err := caller.Execute(ctx,
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
Config: transmissionSchedule,
Inputs: executeInputs,
Expand Down Expand Up @@ -234,7 +239,10 @@ func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBo
defer t.mux.Unlock()

sender := toPeerID(msg.Sender)
messageID := target.GetMessageID(msg)
messageID, err := target.GetMessageID(msg)
if err != nil {
panic(err)
}

if t.messageIDToSenders[messageID] == nil {
t.messageIDToSenders[messageID] = make(map[p2ptypes.PeerID]bool)
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
responseCh, err := caller.Execute(ctx,
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
Config: transmissionSchedule,
Inputs: executeInputs,
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/target/request/client_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
}
}
} else {
c.lggr.Warnw("received error response", "error", msg.ErrorMsg)
c.lggr.Warnw("received error response", "error", remote.SanitizeLogString(msg.ErrorMsg))
c.errorCount[msg.ErrorMsg]++
if c.errorCount[msg.ErrorMsg] == c.requiredIdenticalResponses {
c.sendResponse(commoncap.CapabilityResponse{Err: errors.New(msg.ErrorMsg)})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

const (
workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed"
)

func Test_ClientRequest_MessageValidation(t *testing.T) {
lggr := logger.TestLogger(t)

Expand Down Expand Up @@ -68,8 +73,8 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

capabilityRequest := commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
Inputs: executeInputs,
Config: transmissionSchedule,
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/target/request/server_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (e *ServerRequest) executeRequest(ctx context.Context, payload []byte) erro
return fmt.Errorf("failed to marshal capability response: %w", err)
}

e.lggr.Debugw("received execution results", "metadata", capabilityRequest.Metadata, "error", capResponse.Err)
e.lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", capResponse.Err)
e.setResult(responsePayload)
return nil
}
Expand Down
25 changes: 18 additions & 7 deletions core/capabilities/remote/target/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
Expand Down Expand Up @@ -129,9 +130,14 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
r.receiveLock.Lock()
defer r.receiveLock.Unlock()

r.lggr.Debugw("received request for msg", "msgId", msg.MessageId)
if msg.Method != types.MethodExecute {
r.lggr.Errorw("received request for unsupported method type", "method", msg.Method)
r.lggr.Errorw("received request for unsupported method type", "method", remote.SanitizeLogString(msg.Method))
return
}

messageId, err := GetMessageID(msg)
if err != nil {
r.lggr.Errorw("invalid message id", "err", err, "id", remote.SanitizeLogString(string(msg.MessageId)))
return
}

Expand All @@ -143,9 +149,10 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {

// A request is uniquely identified by the message id and the hash of the payload to prevent a malicious
// actor from sending a different payload with the same message id
messageId := GetMessageID(msg)
requestID := messageId + hex.EncodeToString(msgHash[:])

r.lggr.Debugw("received request", "msgId", msg.MessageId, "requestID", requestID)

if requestIDs, ok := r.messageIDToRequestIDsCount[messageId]; ok {
requestIDs[requestID] = requestIDs[requestID] + 1
} else {
Expand All @@ -156,7 +163,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
if len(requestIDs) > 1 {
// This is a potential attack vector as well as a situation that will occur if the client is sending non-deterministic payloads
// so a warning is logged
r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageId, "requestIDToCount", requestIDs)
r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageId, "lenRequestIDs", len(requestIDs))
}

if _, ok := r.requestIDToRequest[requestID]; !ok {
Expand All @@ -177,7 +184,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {

err = reqAndMsgID.request.OnMessage(ctx, msg)
if err != nil {
r.lggr.Errorw("request failed to OnMessage new message", "request", reqAndMsgID, "err", err)
r.lggr.Errorw("request failed to OnMessage new message", "messageID", reqAndMsgID.messageID, "err", err)
}
}

Expand All @@ -201,8 +208,12 @@ func (r *server) getMessageHash(msg *types.MessageBody) ([32]byte, error) {
return hash, nil
}

func GetMessageID(msg *types.MessageBody) string {
return string(msg.MessageId)
func GetMessageID(msg *types.MessageBody) (string, error) {
idStr := string(msg.MessageId)
if !remote.IsValidID(idStr) {
return "", fmt.Errorf("invalid message id")
}
return idStr, nil
}

func (r *server) Ready() error {
Expand Down
16 changes: 8 additions & 8 deletions core/capabilities/remote/target/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func Test_Server_ExcludesNonDeterministicInputAttributes(t *testing.T) {
_, err = caller.Execute(context.Background(),
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
Inputs: inputs,
})
Expand All @@ -67,8 +67,8 @@ func Test_Server_RespondsAfterSufficientRequests(t *testing.T) {
_, err := caller.Execute(context.Background(),
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
})
require.NoError(t, err)
Expand All @@ -94,8 +94,8 @@ func Test_Server_InsufficientCallers(t *testing.T) {
_, err := caller.Execute(context.Background(),
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
})
require.NoError(t, err)
Expand All @@ -121,8 +121,8 @@ func Test_Server_CapabilityError(t *testing.T) {
_, err := caller.Execute(context.Background(),
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
})
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
p.lggr.Errorw("sender not a member of its workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId, "sender", sender)
return
}
if !IsValidWorkflowOrExecutionID(req.Metadata.WorkflowID) {
p.lggr.Errorw("received trigger request with invalid workflow ID", "capabilityId", p.capInfo.ID, "workflowId", SanitizeLogString(req.Metadata.WorkflowID))
return
}
p.lggr.Debugw("received trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "sender", sender)
key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID}
nowMs := time.Now().UnixMilli()
Expand Down Expand Up @@ -145,7 +149,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
p.lggr.Errorw("failed to register trigger", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err)
}
} else {
p.lggr.Errorw("received trigger request with unknown method", "method", msg.Method, "sender", sender)
p.lggr.Errorw("received trigger request with unknown method", "method", SanitizeLogString(msg.Method), "sender", sender)
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
registration, found := s.registeredWorkflows[workflowId]
s.mu.RUnlock()
if !found {
s.lggr.Errorw("received message for unregistered workflow", "capabilityId", s.capInfo.ID, "workflowID", workflowId, "sender", sender)
s.lggr.Errorw("received message for unregistered workflow", "capabilityId", s.capInfo.ID, "workflowID", SanitizeLogString(workflowId), "sender", sender)
continue
}
key := triggerEventKey{
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
}
}
} else {
s.lggr.Errorw("received trigger event with unknown method", "method", msg.Method, "sender", sender)
s.lggr.Errorw("received trigger event with unknown method", "method", SanitizeLogString(msg.Method), "sender", sender)
}
}

Expand Down
5 changes: 2 additions & 3 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/values"
Expand All @@ -22,7 +21,7 @@ import (
const (
peerID1 = "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC"
peerID2 = "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8"
workflowID1 = "workflowID1"
workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
)

var (
Expand Down Expand Up @@ -63,7 +62,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
})

// register trigger
config := &capabilities.RemoteTriggerConfig{
config := &commoncap.RemoteTriggerConfig{
RegistrationRefresh: 100 * time.Millisecond,
RegistrationExpiry: 100 * time.Second,
MinResponsesToAggregate: 1,
Expand Down
43 changes: 43 additions & 0 deletions core/capabilities/remote/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"unicode"

"google.golang.org/protobuf/proto"

Expand All @@ -16,6 +17,12 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

const (
maxLoggedStringLen = 256
validWorkflowIDLen = 64
maxIDLen = 128
)

func ValidateMessage(msg p2ptypes.Message, expectedReceiver p2ptypes.PeerID) (*remotetypes.MessageBody, error) {
var topLevelMessage remotetypes.Message
err := proto.Unmarshal(msg.Payload, &topLevelMessage)
Expand Down Expand Up @@ -93,3 +100,39 @@ func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte,
}
return found, nil
}

func SanitizeLogString(s string) string {
tooLongSuffix := ""
if len(s) > maxLoggedStringLen {
s = s[:maxLoggedStringLen]
tooLongSuffix = " [TRUNCATED]"
}
for i := 0; i < len(s); i++ {
if !unicode.IsPrint(rune(s[i])) {
return "[UNPRINTABLE] " + hex.EncodeToString([]byte(s)) + tooLongSuffix
}
}
return s + tooLongSuffix
}

// Workflow IDs and Execution IDs are 32-byte hex-encoded strings
func IsValidWorkflowOrExecutionID(id string) bool {
if len(id) != validWorkflowIDLen {
return false
}
_, err := hex.DecodeString(id)
return err == nil
}

// Trigger event IDs and message IDs can only contain printable characters and must be non-empty
func IsValidID(id string) bool {
if len(id) == 0 || len(id) > maxIDLen {
return false
}
for i := 0; i < len(id); i++ {
if !unicode.IsPrint(rune(id[i])) {
return false
}
}
return true
}
23 changes: 23 additions & 0 deletions core/capabilities/remote/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,26 @@ func TestDefaultModeAggregator_Aggregate(t *testing.T) {
require.NoError(t, err)
require.Equal(t, res, capResponse1)
}

func TestSanitizeLogString(t *testing.T) {
require.Equal(t, "hello", remote.SanitizeLogString("hello"))
require.Equal(t, "[UNPRINTABLE] 0a", remote.SanitizeLogString("\n"))

longString := ""
for i := 0; i < 100; i++ {
longString += "aa-aa-aa-"
}
require.Equal(t, longString[:256]+" [TRUNCATED]", remote.SanitizeLogString(longString))
}

func TestIsValidWorkflowID(t *testing.T) {
require.False(t, remote.IsValidWorkflowOrExecutionID("too_short"))
require.False(t, remote.IsValidWorkflowOrExecutionID("nothex--95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"))
require.True(t, remote.IsValidWorkflowOrExecutionID("15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"))
}

func TestIsValidTriggerEventID(t *testing.T) {
require.False(t, remote.IsValidID(""))
require.False(t, remote.IsValidID("\n\n"))
require.True(t, remote.IsValidID("id_id_2"))
}

0 comments on commit 95cb692

Please sign in to comment.