Skip to content

Commit

Permalink
Merge branch 'release/2.19.0-aptos' into aptos-init
Browse files Browse the repository at this point in the history
  • Loading branch information
smickovskid authored Dec 11, 2024
2 parents fb21d3e + ab4887c commit 3d3da9b
Show file tree
Hide file tree
Showing 25 changed files with 517 additions and 151 deletions.
5 changes: 5 additions & 0 deletions .changeset/big-camels-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix fix missing unregister in mercury job loop
3 changes: 2 additions & 1 deletion core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability
}

var (
defaultTargetRequestTimeout = time.Minute
// TODO: make this configurable
defaultTargetRequestTimeout = 8 * time.Minute
)

func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.PeerID, don registrysyncer.DON, state *registrysyncer.LocalRegistry, remoteWorkflowDONs []registrysyncer.DON) error {
Expand Down
10 changes: 8 additions & 2 deletions core/capabilities/remote/executable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var _ commoncap.ExecutableCapability = &client{}
var _ types.Receiver = &client{}
var _ services.Service = &client{}

const expiryCheckInterval = 30 * time.Second

func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration, lggr logger.Logger) *client {
return &client{
Expand Down Expand Up @@ -98,7 +100,11 @@ func (c *client) checkDispatcherReady() {
}

func (c *client) checkForExpiredRequests() {
ticker := time.NewTicker(c.requestTimeout)
tickerInterval := expiryCheckInterval
if c.requestTimeout < tickerInterval {
tickerInterval = c.requestTimeout
}
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
for {
select {
Expand All @@ -116,7 +122,7 @@ func (c *client) expireRequests() {

for messageID, req := range c.requestIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired"))
req.Cancel(errors.New("request expired by executable client"))
delete(c.requestIDToCallerRequest, messageID)
}

Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/remote/executable/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) {

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.Equal(t, "error executing request: failed to execute capability: an error", responseError.Error())
assert.Equal(t, "error executing request: failed to execute capability", responseError.Error())
})
})

Expand All @@ -156,12 +156,12 @@ func Test_RemoteExecutableCapability_RandomCapabilityError(t *testing.T) {

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.Equal(t, "error executing request: request expired", responseError.Error())
assert.Equal(t, "error executing request: failed to execute capability", responseError.Error())
})
})

for _, method := range methods {
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Millisecond, 10, 9, 10*time.Minute,
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 1*time.Second, 10, 9, 10*time.Minute,
method)
}
}
Expand Down
13 changes: 9 additions & 4 deletions core/capabilities/remote/executable/request/server_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package request

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -48,6 +49,8 @@ type ServerRequest struct {
lggr logger.Logger
}

var errExternalErrorMsg = errors.New("failed to execute capability")

func NewServerRequest(capability capabilities.ExecutableCapability, method string, capabilityID string, capabilityDonID uint32,
capabilityPeerID p2ptypes.PeerID,
callingDon commoncap.DON, requestID string,
Expand Down Expand Up @@ -228,20 +231,22 @@ func executeCapabilityRequest(ctx context.Context, lggr logger.Logger, capabilit
payload []byte) ([]byte, error) {
capabilityRequest, err := pb.UnmarshalCapabilityRequest(payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal capability request: %w", err)
lggr.Errorw("failed to unmarshal capability request", "err", err)
return nil, errExternalErrorMsg
}

lggr.Debugw("executing capability", "metadata", capabilityRequest.Metadata)
capResponse, err := capability.Execute(ctx, capabilityRequest)

if err != nil {
lggr.Debugw("received execution error", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", err)
return nil, fmt.Errorf("failed to execute capability: %w", err)
lggr.Errorw("received execution error", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", err)
return nil, errExternalErrorMsg
}

responsePayload, err := pb.MarshalCapabilityResponse(capResponse)
if err != nil {
return nil, fmt.Errorf("failed to marshal capability response: %w", err)
lggr.Errorw("failed to marshal capability request", "err", err)
return nil, errExternalErrorMsg
}

lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func Test_ServerRequest_MessageValidation(t *testing.T) {
require.NoError(t, err)
assert.Len(t, dispatcher.msgs, 2)
assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[0].Error)
assert.Equal(t, "failed to execute capability: an error", dispatcher.msgs[0].ErrorMsg)
assert.Equal(t, "failed to execute capability", dispatcher.msgs[0].ErrorMsg)
assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[1].Error)
assert.Equal(t, "failed to execute capability: an error", dispatcher.msgs[1].ErrorMsg)
assert.Equal(t, "failed to execute capability", dispatcher.msgs[1].ErrorMsg)
})

t.Run("Execute capability", func(t *testing.T) {
Expand Down
8 changes: 6 additions & 2 deletions core/capabilities/remote/executable/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (r *server) Start(ctx context.Context) error {
r.wg.Add(1)
go func() {
defer r.wg.Done()
ticker := time.NewTicker(r.requestTimeout)
tickerInterval := expiryCheckInterval
if r.requestTimeout < tickerInterval {
tickerInterval = r.requestTimeout
}
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
r.lggr.Info("executable capability server started")
for {
Expand Down Expand Up @@ -118,7 +122,7 @@ func (r *server) expireRequests() {

for requestID, executeReq := range r.requestIDToRequest {
if executeReq.request.Expired() {
err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired")
err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired by executable server")
if err != nil {
r.lggr.Errorw("failed to cancel request", "request", executeReq, "err", err)
}
Expand Down
5 changes: 5 additions & 0 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/static"
"github.com/smartcontractkit/chainlink/v2/core/store/migrate"
Expand Down Expand Up @@ -109,6 +110,10 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme
AuthPublicKeyHex: csaPubKeyHex,
AuthHeaders: beholderAuthHeaders,
}
// note: due to the OTEL specification, all histogram buckets
// must be defined when the beholder client is created
clientCfg.MetricViews = append(clientCfg.MetricViews, workflows.MetricViews()...)

if tracingCfg.Enabled {
clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f
github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink/v2 v2.14.0-mercury-20240807.0.20241106193309-5560cd76211a
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
Expand Down Expand Up @@ -421,7 +421,7 @@ require (
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20241009055228-33d0c0bf38de // indirect
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20241009055228-33d0c0bf38de // indirect
github.com/smartcontractkit/wsrpc v0.8.2 // indirect
github.com/smartcontractkit/wsrpc v0.8.3 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1409,8 +1409,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec h1:5vS1k8Qn09p8SQ3JzvS8iy4Pve7s3aVq+UPIdl74smY=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 h1:2llRW4Tn9W/EZp2XvXclQ9IjeTBwwxVPrrqaerX+vCE=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f h1:RZ90dXrz+nQsM5+7Rz/+ZvUs9WgZj1ZqGuRxtsMwgV8=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down Expand Up @@ -1441,8 +1441,8 @@ github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20241009055228-
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20241009055228-33d0c0bf38de/go.mod h1:Sl2MF/Fp3fgJIVzhdGhmZZX2BlnM0oUUyBP4s4xYb6o=
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20241009055228-33d0c0bf38de h1:66VQxXx3lvTaAZrMBkIcdH9VEjujUEvmBQdnyOJnkOc=
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20241009055228-33d0c0bf38de/go.mod h1:NSc7hgOQbXG3DAwkOdWnZzLTZENXSwDJ7Va1nBp0YU0=
github.com/smartcontractkit/wsrpc v0.8.2 h1:XB/xcn/MMseHW+8JE8+a/rceA86ck7Ur6cEa9LiUC8M=
github.com/smartcontractkit/wsrpc v0.8.2/go.mod h1:2u/wfnhl5R4RlSXseN4n6HHIWk8w1Am3AT6gWftQbNg=
github.com/smartcontractkit/wsrpc v0.8.3 h1:9tDf7Ut61g36RJIyxV9iI73SqoOMasKPfURV9oMLrPg=
github.com/smartcontractkit/wsrpc v0.8.3/go.mod h1:2u/wfnhl5R4RlSXseN4n6HHIWk8w1Am3AT6gWftQbNg=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
Expand Down
Loading

0 comments on commit 3d3da9b

Please sign in to comment.