Skip to content

Commit

Permalink
[CAPPL-324] Fix panic when fetching binary (#15490)
Browse files Browse the repository at this point in the history
* Fix panic when fetching the binary

* [CAPPL-324] Fix panic when fetching the binary

* [fix] Various Gateway bugs

- Stop logging out the request/response body
- Add a timeout when fetching the request
- Add the method in various places where it was missing

* Fix test

* Linting

* Review comments
  • Loading branch information
cedric-cordenier authored Dec 4, 2024
1 parent c7cb608 commit 22c6cf7
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 73 deletions.
22 changes: 18 additions & 4 deletions core/capabilities/webapi/outgoing_connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
Expand All @@ -19,6 +20,7 @@ import (
var _ connector.GatewayConnectorHandler = &OutgoingConnectorHandler{}

type OutgoingConnectorHandler struct {
services.StateMachine
gc connector.GatewayConnector
method string
lggr logger.Logger
Expand Down Expand Up @@ -98,7 +100,7 @@ func (c *OutgoingConnectorHandler) HandleGatewayMessage(ctx context.Context, gat
}
l.Debugw("handling gateway request")
switch body.Method {
case capabilities.MethodWebAPITarget, capabilities.MethodComputeAction:
case capabilities.MethodWebAPITarget, capabilities.MethodComputeAction, capabilities.MethodWorkflowSyncer:
body := &msg.Body
var payload capabilities.Response
err := json.Unmarshal(body.Payload, &payload)
Expand All @@ -125,16 +127,28 @@ func (c *OutgoingConnectorHandler) HandleGatewayMessage(ctx context.Context, gat
}

func (c *OutgoingConnectorHandler) Start(ctx context.Context) error {
return c.gc.AddHandler([]string{c.method}, c)
return c.StartOnce("OutgoingConnectorHandler", func() error {
return c.gc.AddHandler([]string{c.method}, c)
})
}

func (c *OutgoingConnectorHandler) Close() error {
return nil
return c.StopOnce("OutgoingConnectorHandler", func() error {
return nil
})
}

func (c *OutgoingConnectorHandler) HealthReport() map[string]error {
return map[string]error{c.Name(): c.Healthy()}
}

func (c *OutgoingConnectorHandler) Name() string {
return c.lggr.Name()
}

func validMethod(method string) bool {
switch method {
case capabilities.MethodWebAPITarget, capabilities.MethodComputeAction:
case capabilities.MethodWebAPITarget, capabilities.MethodComputeAction, capabilities.MethodWorkflowSyncer:
return true
default:
return false
Expand Down
28 changes: 4 additions & 24 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand All @@ -50,8 +49,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/feeds"
"github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
capabilities2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
common2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keeper"
Expand Down Expand Up @@ -303,30 +300,13 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("expected 1 key, got %d", len(keys))
}

connector := gatewayConnectorWrapper.GetGatewayConnector()
webAPILggr := globalLogger.Named("WebAPITarget")

webAPIConfig := webapi.ServiceConfig{
RateLimiter: common2.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}

outgoingConnectorHandler, err := webapi.NewOutgoingConnectorHandler(connector,
webAPIConfig,
capabilities2.MethodWebAPITarget, webAPILggr)
if err != nil {
return nil, fmt.Errorf("could not create outgoing connector handler: %w", err)
}
fetcher := syncer.NewFetcherService(globalLogger, gatewayConnectorWrapper)

eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger),
syncer.NewFetcherFunc(globalLogger, outgoingConnectorHandler), workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
fetcher.Fetch, workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0])

loader := syncer.NewWorkflowRegistryContractLoader(cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
loader := syncer.NewWorkflowRegistryContractLoader(globalLogger, cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
}, eventHandler)

Expand All @@ -338,7 +318,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
QueryCount: 100,
}, eventHandler, loader, workflowDonNotifier)

srvcs = append(srvcs, wfSyncer)
srvcs = append(srvcs, fetcher, wfSyncer)
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/services/gateway/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// GatewayConnector is a component run by Nodes to connect to a set of Gateways.
type GatewayConnector interface {
job.ServiceCtx
services.Service
network.ConnectionInitiator

AddHandler(methods []string, handler GatewayConnectorHandler) error
Expand Down
137 changes: 137 additions & 0 deletions core/services/gateway/connector/mocks/gateway_connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions core/services/gateway/handlers/capabilities/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ func (h *handler) handleWebAPIOutgoingMessage(ctx context.Context, msg *api.Mess
newCtx := context.WithoutCancel(ctx)
newCtx, cancel := context.WithTimeout(newCtx, timeout)
defer cancel()
l := h.lggr.With("url", payload.URL, "messageId", msg.Body.MessageId, "method", payload.Method)
l := h.lggr.With("url", payload.URL, "messageId", msg.Body.MessageId, "method", payload.Method, "timeout", payload.TimeoutMs)
l.Debug("Sending request to client")
respMsg, err := h.sendHTTPMessageToClient(newCtx, req, msg)
if err != nil {
l.Errorw("error while sending HTTP request to external endpoint", "err", err)
Expand Down Expand Up @@ -187,7 +188,7 @@ func (h *handler) HandleNodeMessage(ctx context.Context, msg *api.Message, nodeA
switch msg.Body.Method {
case MethodWebAPITrigger:
return h.handleWebAPITriggerMessage(ctx, msg, nodeAddr)
case MethodWebAPITarget, MethodComputeAction:
case MethodWebAPITarget, MethodComputeAction, MethodWorkflowSyncer:
return h.handleWebAPIOutgoingMessage(ctx, msg, nodeAddr)
default:
return fmt.Errorf("unsupported method: %s", msg.Body.Method)
Expand Down
2 changes: 1 addition & 1 deletion core/services/gateway/network/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *httpClient) Send(ctx context.Context, req HTTPRequest) (*HTTPResponse,
// joining them to a single string in case array size is greater than 1
headers[k] = strings.Join(v, ",")
}
c.lggr.Debugw("received HTTP response", "statusCode", resp.StatusCode, "body", string(body), "url", req.URL, "headers", headers)
c.lggr.Debugw("received HTTP response", "statusCode", resp.StatusCode, "url", req.URL, "headers", headers)

return &HTTPResponse{
Headers: headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func Test_InitialStateSync(t *testing.T) {
}

testEventHandler := newTestEvtHandler()
loader := syncer.NewWorkflowRegistryContractLoader(wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
}, testEventHandler)

Expand Down
Loading

0 comments on commit 22c6cf7

Please sign in to comment.