Skip to content

Commit

Permalink
Merge branch 'release/2.19.0-aptos' into ui-fix-release-backport
Browse files Browse the repository at this point in the history
  • Loading branch information
archseer authored Dec 11, 2024
2 parents 5c4c8a4 + ab4887c commit 1724a51
Show file tree
Hide file tree
Showing 25 changed files with 502 additions and 136 deletions.
5 changes: 5 additions & 0 deletions .changeset/big-camels-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix fix missing unregister in mercury job loop
3 changes: 2 additions & 1 deletion core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability
}

var (
defaultTargetRequestTimeout = time.Minute
// TODO: make this configurable
defaultTargetRequestTimeout = 8 * time.Minute
)

func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.PeerID, don registrysyncer.DON, state *registrysyncer.LocalRegistry, remoteWorkflowDONs []registrysyncer.DON) error {
Expand Down
10 changes: 8 additions & 2 deletions core/capabilities/remote/executable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var _ commoncap.ExecutableCapability = &client{}
var _ types.Receiver = &client{}
var _ services.Service = &client{}

const expiryCheckInterval = 30 * time.Second

func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration, lggr logger.Logger) *client {
return &client{
Expand Down Expand Up @@ -98,7 +100,11 @@ func (c *client) checkDispatcherReady() {
}

func (c *client) checkForExpiredRequests() {
ticker := time.NewTicker(c.requestTimeout)
tickerInterval := expiryCheckInterval
if c.requestTimeout < tickerInterval {
tickerInterval = c.requestTimeout
}
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
for {
select {
Expand All @@ -116,7 +122,7 @@ func (c *client) expireRequests() {

for messageID, req := range c.requestIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired"))
req.Cancel(errors.New("request expired by executable client"))
delete(c.requestIDToCallerRequest, messageID)
}

Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/remote/executable/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) {

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.Equal(t, "error executing request: failed to execute capability: an error", responseError.Error())
assert.Equal(t, "error executing request: failed to execute capability", responseError.Error())
})
})

Expand All @@ -156,12 +156,12 @@ func Test_RemoteExecutableCapability_RandomCapabilityError(t *testing.T) {

methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) {
executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) {
assert.Equal(t, "error executing request: request expired", responseError.Error())
assert.Equal(t, "error executing request: failed to execute capability", responseError.Error())
})
})

for _, method := range methods {
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Millisecond, 10, 9, 10*time.Minute,
testRemoteExecutableCapability(ctx, t, capability, 10, 9, 1*time.Second, 10, 9, 10*time.Minute,
method)
}
}
Expand Down
13 changes: 9 additions & 4 deletions core/capabilities/remote/executable/request/server_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package request

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -48,6 +49,8 @@ type ServerRequest struct {
lggr logger.Logger
}

var errExternalErrorMsg = errors.New("failed to execute capability")

func NewServerRequest(capability capabilities.ExecutableCapability, method string, capabilityID string, capabilityDonID uint32,
capabilityPeerID p2ptypes.PeerID,
callingDon commoncap.DON, requestID string,
Expand Down Expand Up @@ -228,20 +231,22 @@ func executeCapabilityRequest(ctx context.Context, lggr logger.Logger, capabilit
payload []byte) ([]byte, error) {
capabilityRequest, err := pb.UnmarshalCapabilityRequest(payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal capability request: %w", err)
lggr.Errorw("failed to unmarshal capability request", "err", err)
return nil, errExternalErrorMsg
}

lggr.Debugw("executing capability", "metadata", capabilityRequest.Metadata)
capResponse, err := capability.Execute(ctx, capabilityRequest)

if err != nil {
lggr.Debugw("received execution error", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", err)
return nil, fmt.Errorf("failed to execute capability: %w", err)
lggr.Errorw("received execution error", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", err)
return nil, errExternalErrorMsg
}

responsePayload, err := pb.MarshalCapabilityResponse(capResponse)
if err != nil {
return nil, fmt.Errorf("failed to marshal capability response: %w", err)
lggr.Errorw("failed to marshal capability request", "err", err)
return nil, errExternalErrorMsg
}

lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func Test_ServerRequest_MessageValidation(t *testing.T) {
require.NoError(t, err)
assert.Len(t, dispatcher.msgs, 2)
assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[0].Error)
assert.Equal(t, "failed to execute capability: an error", dispatcher.msgs[0].ErrorMsg)
assert.Equal(t, "failed to execute capability", dispatcher.msgs[0].ErrorMsg)
assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[1].Error)
assert.Equal(t, "failed to execute capability: an error", dispatcher.msgs[1].ErrorMsg)
assert.Equal(t, "failed to execute capability", dispatcher.msgs[1].ErrorMsg)
})

t.Run("Execute capability", func(t *testing.T) {
Expand Down
8 changes: 6 additions & 2 deletions core/capabilities/remote/executable/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (r *server) Start(ctx context.Context) error {
r.wg.Add(1)
go func() {
defer r.wg.Done()
ticker := time.NewTicker(r.requestTimeout)
tickerInterval := expiryCheckInterval
if r.requestTimeout < tickerInterval {
tickerInterval = r.requestTimeout
}
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
r.lggr.Info("executable capability server started")
for {
Expand Down Expand Up @@ -118,7 +122,7 @@ func (r *server) expireRequests() {

for requestID, executeReq := range r.requestIDToRequest {
if executeReq.request.Expired() {
err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired")
err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired by executable server")
if err != nil {
r.lggr.Errorw("failed to cancel request", "request", executeReq, "err", err)
}
Expand Down
5 changes: 5 additions & 0 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/static"
"github.com/smartcontractkit/chainlink/v2/core/store/migrate"
Expand Down Expand Up @@ -109,6 +110,10 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme
AuthPublicKeyHex: csaPubKeyHex,
AuthHeaders: beholderAuthHeaders,
}
// note: due to the OTEL specification, all histogram buckets
// must be defined when the beholder client is created
clientCfg.MetricViews = append(clientCfg.MetricViews, workflows.MetricViews()...)

if tracingCfg.Enabled {
clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f
github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink/v2 v2.14.0-mercury-20240807.0.20241106193309-5560cd76211a
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1409,8 +1409,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec h1:5vS1k8Qn09p8SQ3JzvS8iy4Pve7s3aVq+UPIdl74smY=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 h1:2llRW4Tn9W/EZp2XvXclQ9IjeTBwwxVPrrqaerX+vCE=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f h1:RZ90dXrz+nQsM5+7Rz/+ZvUs9WgZj1ZqGuRxtsMwgV8=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
89 changes: 60 additions & 29 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mercury

import (
"context"
"encoding/json"
"fmt"
"os/exec"
Expand Down Expand Up @@ -79,14 +80,13 @@ func NewServices(
return nil, errors.New("expected job to have a non-nil PipelineSpec")
}

var err error
var pluginConfig config.PluginConfig
if len(jb.OCR2OracleSpec.PluginConfig) == 0 {
if !enableTriggerCapability {
return nil, fmt.Errorf("at least one transmission option must be configured")
}
} else {
err = json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig)
err := json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -101,8 +101,8 @@ func NewServices(
// encapsulate all the subservices and ensure we close them all if any fail to start
srvs := []job.ServiceCtx{ocr2Provider}
abort := func() {
if err = services.MultiCloser(srvs).Close(); err != nil {
lggr.Errorw("Error closing unused services", "err", err)
if cerr := services.MultiCloser(srvs).Close(); cerr != nil {
lggr.Errorw("Error closing unused services", "err", cerr)
}
}
saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth())
Expand All @@ -112,6 +112,7 @@ func NewServices(
var (
factory ocr3types.MercuryPluginFactory
factoryServices []job.ServiceCtx
fErr error
)
fCfg := factoryCfg{
orm: orm,
Expand All @@ -127,31 +128,31 @@ func NewServices(
}
switch feedID.Version() {
case 1:
factory, factoryServices, err = newv1factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv1factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 2:
factory, factoryServices, err = newv2factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv2factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 3:
factory, factoryServices, err = newv3factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv3factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 4:
factory, factoryServices, err = newv4factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv4factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v4 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v4 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
default:
Expand Down Expand Up @@ -214,13 +215,14 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loop mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV4Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -253,13 +255,14 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -292,13 +295,14 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -329,13 +333,14 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand All @@ -344,20 +349,46 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
return factory, srvs, nil
}

func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) {
func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, *loopUnregisterCloser, loop.GRPCOpts, logger.Logger, error) {
lggr.Debugw("Initializing Mercury loop", "command", cmd)
mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String())
envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
}
loopID := mercuryLggr.Name()
cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{
ID: mercuryLggr.Name(),
ID: loopID,
Cmd: cmd,
Env: envVars,
})
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
}
return cmdFn, newLoopUnregister(cfg, loopID), opts, mercuryLggr, nil
}

// loopUnregisterCloser is a helper to unregister a loop
// as a service
// TODO BCF-3451 all other jobs that use custom plugin providers that should be refactored to use this pattern
// perhaps it can be implemented in the delegate on job delete.
type loopUnregisterCloser struct {
r plugins.RegistrarConfig
id string
}

func (l *loopUnregisterCloser) Close() error {
l.r.UnregisterLOOP(l.id)
return nil
}

func (l *loopUnregisterCloser) Start(ctx context.Context) error {
return nil
}

func newLoopUnregister(r plugins.RegistrarConfig, id string) *loopUnregisterCloser {
return &loopUnregisterCloser{
r: r,
id: id,
}
return cmdFn, opts, mercuryLggr, nil
}
Loading

0 comments on commit 1724a51

Please sign in to comment.