diff --git a/.changeset/silly-lies-boil.md b/.changeset/silly-lies-boil.md new file mode 100644 index 00000000000..b2a5084a36c --- /dev/null +++ b/.changeset/silly-lies-boil.md @@ -0,0 +1,8 @@ +--- +"chainlink": minor +--- + +Make websocket URL `WSURL` for `EVM.Nodes` optional, and apply logic so that: +* If WS URL was not provided, SubscribeFilterLogs should fail with an explicit error +* If WS URL was not provided LogBroadcaster should be disabled +#nops diff --git a/common/client/node.go b/common/client/node.go index 1f55e69cacc..7885fe76760 100644 --- a/common/client/node.go +++ b/common/client/node.go @@ -91,14 +91,14 @@ type node[ services.StateMachine lfcLog logger.Logger name string - id int32 + id int chainID CHAIN_ID nodePoolCfg NodeConfig chainCfg ChainConfig order int32 chainFamily string - ws url.URL + ws *url.URL http *url.URL rpc RPC @@ -121,10 +121,10 @@ func NewNode[ nodeCfg NodeConfig, chainCfg ChainConfig, lggr logger.Logger, - wsuri url.URL, + wsuri *url.URL, httpuri *url.URL, name string, - id int32, + id int, chainID CHAIN_ID, nodeOrder int32, rpc RPC, @@ -136,8 +136,10 @@ func NewNode[ n.chainID = chainID n.nodePoolCfg = nodeCfg n.chainCfg = chainCfg - n.ws = wsuri n.order = nodeOrder + if wsuri != nil { + n.ws = wsuri + } if httpuri != nil { n.http = httpuri } @@ -157,7 +159,10 @@ func NewNode[ } func (n *node[CHAIN_ID, HEAD, RPC]) String() string { - s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String()) + s := fmt.Sprintf("(%s)%s", Primary.String(), n.name) + if n.ws != nil { + s = s + fmt.Sprintf(":%s", n.ws.String()) + } if n.http != nil { s = s + fmt.Sprintf(":%s", n.http.String()) } diff --git a/common/client/node_test.go b/common/client/node_test.go index 66bb50fc94f..539964691c0 100644 --- a/common/client/node_test.go +++ b/common/client/node_test.go @@ -67,10 +67,10 @@ type testNodeOpts struct { config testNodeConfig chainConfig clientMocks.ChainConfig lggr logger.Logger - wsuri url.URL + wsuri *url.URL httpuri *url.URL name string - id int32 + id int chainID types.ID nodeOrder int32 rpc *mockNodeClient[types.ID, Head] diff --git a/core/chains/evm/client/config_builder.go b/core/chains/evm/client/config_builder.go index 9a31f9e4b40..66bdfc2614f 100644 --- a/core/chains/evm/client/config_builder.go +++ b/core/chains/evm/client/config_builder.go @@ -80,15 +80,21 @@ func NewClientConfigs( func parseNodeConfigs(nodeCfgs []NodeConfig) ([]*toml.Node, error) { nodes := make([]*toml.Node, len(nodeCfgs)) for i, nodeCfg := range nodeCfgs { - if nodeCfg.WSURL == nil || nodeCfg.HTTPURL == nil { - return nil, fmt.Errorf("node config [%d]: missing WS or HTTP URL", i) + var wsURL, httpURL *commonconfig.URL + // wsUrl requirement will be checked in EVMConfig validation + if nodeCfg.WSURL != nil { + wsURL = commonconfig.MustParseURL(*nodeCfg.WSURL) } - wsUrl := commonconfig.MustParseURL(*nodeCfg.WSURL) - httpUrl := commonconfig.MustParseURL(*nodeCfg.HTTPURL) + + if nodeCfg.HTTPURL == nil { + return nil, fmt.Errorf("node config [%d]: missing HTTP URL", i) + } + + httpURL = commonconfig.MustParseURL(*nodeCfg.HTTPURL) node := &toml.Node{ Name: nodeCfg.Name, - WSURL: wsUrl, - HTTPURL: httpUrl, + WSURL: wsURL, + HTTPURL: httpURL, SendOnly: nodeCfg.SendOnly, Order: nodeCfg.Order, } diff --git a/core/chains/evm/client/config_builder_test.go b/core/chains/evm/client/config_builder_test.go index 28620ac6ca9..22956fb0185 100644 --- a/core/chains/evm/client/config_builder_test.go +++ b/core/chains/evm/client/config_builder_test.go @@ -93,7 +93,7 @@ func TestNodeConfigs(t *testing.T) { require.Len(t, tomlNodes, len(nodeConfigs)) }) - t.Run("parsing missing ws url fails", func(t *testing.T) { + t.Run("ws can be optional", func(t *testing.T) { nodeConfigs := []client.NodeConfig{ { Name: ptr("foo1"), @@ -101,7 +101,7 @@ func TestNodeConfigs(t *testing.T) { }, } _, err := client.ParseTestNodeConfigs(nodeConfigs) - require.Error(t, err) + require.Nil(t, err) }) t.Run("parsing missing http url fails", func(t *testing.T) { diff --git a/core/chains/evm/client/evm_client.go b/core/chains/evm/client/evm_client.go index c26362d6351..c596bbc3a95 100644 --- a/core/chains/evm/client/evm_client.go +++ b/core/chains/evm/client/evm_client.go @@ -15,22 +15,25 @@ import ( ) func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, clientErrors evmconfig.ClientErrors, lggr logger.Logger, chainID *big.Int, nodes []*toml.Node, chainType chaintype.ChainType) Client { - var empty url.URL var primaries []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient] var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient] largePayloadRPCTimeout, defaultRPCTimeout := getRPCTimeouts(chainType) for i, node := range nodes { + var ws *url.URL + if node.WSURL != nil { + ws = (*url.URL)(node.WSURL) + } if node.SendOnly != nil && *node.SendOnly { - rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, + rpc := NewRPCClient(lggr, nil, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, commonclient.Secondary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType) sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL), *node.Name, chainID, rpc) sendonlys = append(sendonlys, sendonly) } else { - rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), + rpc := NewRPCClient(lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType) primaryNode := commonclient.NewNode(cfg, chainCfg, - lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order, + lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, *node.Order, rpc, "EVM") primaries = append(primaries, primaryNode) } diff --git a/core/chains/evm/client/helpers_test.go b/core/chains/evm/client/helpers_test.go index 031f6481574..1a6090e4a01 100644 --- a/core/chains/evm/client/helpers_test.go +++ b/core/chains/evm/client/helpers_test.go @@ -135,7 +135,7 @@ func NewChainClientWithTestNode( rpcUrl string, rpcHTTPURL *url.URL, sendonlyRPCURLs []url.URL, - id int32, + id int, chainID *big.Int, ) (Client, error) { parsed, err := url.ParseRequestURI(rpcUrl) @@ -148,10 +148,10 @@ func NewChainClientWithTestNode( } lggr := logger.Test(t) - rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := NewRPCClient(lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient]( - nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM") + nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM") primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n} var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient] @@ -160,7 +160,7 @@ func NewChainClientWithTestNode( return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String()) } var empty url.URL - rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := NewRPCClient(lggr, &empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") s := commonclient.NewSendOnlyNode[*big.Int, RPCClient]( lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc) sendonlys = append(sendonlys, s) @@ -206,7 +206,7 @@ func NewChainClientWithMockedRpc( parsed, _ := url.ParseRequestURI("ws://test") n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient]( - cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM") + cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM") primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n} clientErrors := NewTestClientErrors() c := NewChainClient(lggr, selectionMode, leaseDuration, noNewHeadsThreshold, primaries, nil, chainID, chainType, &clientErrors, 0) diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index a29ed5e118c..f55c35980df 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -117,7 +117,7 @@ type rawclient struct { type rpcClient struct { rpcLog logger.SugaredLogger name string - id int32 + id int chainID *big.Int tier commonclient.NodeTier largePayloadRpcTimeout time.Duration @@ -126,7 +126,7 @@ type rpcClient struct { newHeadsPollInterval time.Duration chainType chaintype.ChainType - ws rawclient + ws *rawclient http *rawclient stateMu sync.RWMutex // protects state* fields @@ -154,10 +154,10 @@ type rpcClient struct { // NewRPCCLient returns a new *rpcClient as commonclient.RPC func NewRPCClient( lggr logger.Logger, - wsuri url.URL, + wsuri *url.URL, httpuri *url.URL, name string, - id int32, + id int, chainID *big.Int, tier commonclient.NodeTier, finalizedBlockPollInterval time.Duration, @@ -175,9 +175,11 @@ func NewRPCClient( r.id = id r.chainID = chainID r.tier = tier - r.ws.uri = wsuri r.finalizedBlockPollInterval = finalizedBlockPollInterval r.newHeadsPollInterval = newHeadsPollInterval + if wsuri != nil { + r.ws = &rawclient{uri: *wsuri} + } if httpuri != nil { r.http = &rawclient{uri: *httpuri} } @@ -199,30 +201,33 @@ func (r *rpcClient) Dial(callerCtx context.Context) error { ctx, cancel := r.makeQueryCtx(callerCtx, r.rpcTimeout) defer cancel() - promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() - lggr := r.rpcLog.With("wsuri", r.ws.uri.Redacted()) - if r.http != nil { - lggr = lggr.With("httpuri", r.http.uri.Redacted()) + if r.ws == nil && r.http == nil { + return errors.New("cannot dial rpc client when both ws and http info are missing") } - lggr.Debugw("RPC dial: evmclient.Client#dial") - wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") - if err != nil { - promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() - return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) - } + promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() + lggr := r.rpcLog + if r.ws != nil { + lggr = lggr.With("wsuri", r.ws.uri.Redacted()) + wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") + if err != nil { + promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() + return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) + } - r.ws.rpc = wsrpc - r.ws.geth = ethclient.NewClient(wsrpc) + r.ws.rpc = wsrpc + r.ws.geth = ethclient.NewClient(wsrpc) + } if r.http != nil { + lggr = lggr.With("httpuri", r.http.uri.Redacted()) if err := r.DialHTTP(); err != nil { return err } } + lggr.Debugw("RPC dial: evmclient.Client#dial") promEVMPoolRPCNodeDialsSuccess.WithLabelValues(r.chainID.String(), r.name).Inc() - return nil } @@ -231,7 +236,7 @@ func (r *rpcClient) Dial(callerCtx context.Context) error { // It can only return error if the URL is malformed. func (r *rpcClient) DialHTTP() error { promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() - lggr := r.rpcLog.With("httpuri", r.ws.uri.Redacted()) + lggr := r.rpcLog.With("httpuri", r.http.uri.Redacted()) lggr.Debugw("RPC dial: evmclient.Client#dial") var httprpc *rpc.Client @@ -251,7 +256,7 @@ func (r *rpcClient) DialHTTP() error { func (r *rpcClient) Close() { defer func() { - if r.ws.rpc != nil { + if r.ws != nil && r.ws.rpc != nil { r.ws.rpc.Close() } }() @@ -270,7 +275,10 @@ func (r *rpcClient) cancelInflightRequests() { } func (r *rpcClient) String() string { - s := fmt.Sprintf("(%s)%s:%s", r.tier.String(), r.name, r.ws.uri.Redacted()) + s := fmt.Sprintf("(%s)%s", r.tier.String(), r.name) + if r.ws != nil { + s = s + fmt.Sprintf(":%s", r.ws.uri.Redacted()) + } if r.http != nil { s = s + fmt.Sprintf(":%s", r.http.uri.Redacted()) } @@ -336,7 +344,7 @@ func (r *rpcClient) registerSub(sub ethereum.Subscription, stopInFLightCh chan s // DisconnectAll disconnects all clients connected to the rpcClient func (r *rpcClient) DisconnectAll() { r.stateMu.Lock() - if r.ws.rpc != nil { + if r.ws != nil && r.ws.rpc != nil { r.ws.rpc.Close() } r.cancelInflightRequests() @@ -497,7 +505,6 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp defer cancel() args := []interface{}{"newHeads"} lggr := r.newRqLggr().With("args", args) - if r.newHeadsPollInterval > 0 { interval := r.newHeadsPollInterval timeout := interval @@ -529,6 +536,10 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp return &poller, nil } + if ws == nil { + return nil, errors.New("SubscribeNewHead is not allowed without ws url") + } + lggr.Debug("RPC call: evmclient.Client#EthSubscribe") start := time.Now() defer func() { @@ -557,7 +568,6 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.Head, sub commontypes.Subscription, err error) { ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) defer cancel() - args := []interface{}{rpcSubscriptionMethodNewHeads} start := time.Now() lggr := r.newRqLggr().With("args", args) @@ -580,6 +590,10 @@ func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H return channel, &poller, nil } + if ws == nil { + return nil, nil, errors.New("SubscribeNewHead is not allowed without ws url") + } + lggr.Debug("RPC call: evmclient.Client#EthSubscribe") defer func() { duration := time.Since(start) @@ -1286,6 +1300,9 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (_ ethereum.Subscription, err error) { ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) defer cancel() + if ws == nil { + return nil, errors.New("SubscribeFilterLogs is not allowed without ws url") + } lggr := r.newRqLggr().With("q", q) lggr.Debug("RPC call: evmclient.Client#SubscribeFilterLogs") @@ -1390,18 +1407,21 @@ func (r *rpcClient) wrapHTTP(err error) error { } // makeLiveQueryCtxAndSafeGetClients wraps makeQueryCtx -func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) { +func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws *rawclient, http *rawclient) { ctx, cancel, _, ws, http = r.acquireQueryCtx(parentCtx, timeout) return } func (r *rpcClient) acquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, - chStopInFlight chan struct{}, ws rawclient, http *rawclient) { + chStopInFlight chan struct{}, ws *rawclient, http *rawclient) { // Need to wrap in mutex because state transition can cancel and replace the // context r.stateMu.RLock() chStopInFlight = r.chStopInFlight - ws = r.ws + if r.ws != nil { + cp := *r.ws + ws = &cp + } if r.http != nil { cp := *r.http http = &cp diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index d959f8d1115..662c757ffb3 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -78,11 +78,18 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { require.Equal(t, int32(0), rpcClient.SubscribersCount()) } + t.Run("WS and HTTP URL cannot be both empty", func(t *testing.T) { + // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing + observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) + rpcClient := client.NewRPCClient(observedLggr, nil, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + require.Equal(t, errors.New("cannot dial rpc client when both ws and http info are missing"), rpcClient.Dial(ctx)) + }) + t.Run("Updates chain info on new blocks", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) // set to default values @@ -132,7 +139,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) ch := make(chan *evmtypes.Head) @@ -177,7 +184,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { } server := createRPCServer() - rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, tests.TestInterval, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, tests.TestInterval, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) latest, highestUserObservations := rpc.GetInterceptedChainInfo() @@ -201,7 +208,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) var wg sync.WaitGroup @@ -225,7 +232,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { t.Run("Block's chain ID matched configured", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) ch := make(chan *evmtypes.Head) @@ -242,7 +249,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.NoError(t, rpc.Dial(ctx)) server.Close() _, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head)) @@ -253,7 +260,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -266,7 +273,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 1, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 1, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -279,7 +286,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -291,7 +298,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 1, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 1, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -303,7 +310,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 1, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 1, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) @@ -314,7 +321,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { t.Run("Subscription error is properly wrapper", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) sub, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head)) @@ -336,13 +343,22 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { lggr := logger.Test(t) ctx, cancel := context.WithTimeout(tests.Context(t), tests.WaitTimeout(t)) defer cancel() + t.Run("Failed SubscribeFilterLogs when WSURL is empty", func(t *testing.T) { + // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing + observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) + rpcClient := client.NewRPCClient(observedLggr, nil, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + require.Nil(t, rpcClient.Dial(ctx)) + + _, err := rpcClient.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) + require.Equal(t, errors.New("SubscribeFilterLogs is not allowed without ws url"), err) + }) t.Run("Failed SubscribeFilterLogs logs and returns proper error", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, func(reqMethod string, reqParams gjson.Result) (resp testutils.JSONRPCResponse) { return resp }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.NoError(t, rpc.Dial(ctx)) server.Close() _, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -359,7 +375,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { return resp }) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) sub, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -408,7 +424,7 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) { } server := createRPCServer() - rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.NoError(t, rpc.Dial(ctx)) defer rpc.Close() server.Head = &evmtypes.Head{Number: 128} @@ -518,7 +534,7 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) { // use something unreasonably large for RPC timeout to ensure that we use largePayloadRPCTimeout const rpcTimeout = time.Hour const largePayloadRPCTimeout = tests.TestInterval - rpc := client.NewRPCClient(logger.Test(t), *rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, largePayloadRPCTimeout, rpcTimeout, "") + rpc := client.NewRPCClient(logger.Test(t), rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, largePayloadRPCTimeout, rpcTimeout, "") require.NoError(t, rpc.Dial(ctx)) defer rpc.Close() err := testCase.Fn(ctx, rpc) @@ -558,7 +574,7 @@ func TestAstarCustomFinality(t *testing.T) { const expectedFinalizedBlockNumber = int64(4) const expectedFinalizedBlockHash = "0x7441e97acf83f555e0deefef86db636bc8a37eb84747603412884e4df4d22804" - rpcClient := client.NewRPCClient(logger.Test(t), *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar) + rpcClient := client.NewRPCClient(logger.Test(t), wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar) defer rpcClient.Close() err := rpcClient.Dial(tests.Context(t)) require.NoError(t, err) diff --git a/core/chains/evm/config/toml/config.go b/core/chains/evm/config/toml/config.go index b5008a73607..b6b6a732363 100644 --- a/core/chains/evm/config/toml/config.go +++ b/core/chains/evm/config/toml/config.go @@ -23,7 +23,9 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" ) -var ErrNotFound = errors.New("not found") +var ( + ErrNotFound = errors.New("not found") +) type HasEVMConfigs interface { EVMConfigs() EVMConfigs @@ -311,16 +313,27 @@ func (c *EVMConfig) ValidateConfig() (err error) { err = multierr.Append(err, commonconfig.ErrMissing{Name: "Nodes", Msg: "must have at least one node"}) } else { var hasPrimary bool - for _, n := range c.Nodes { + var logBroadcasterEnabled bool + if c.LogBroadcasterEnabled != nil { + logBroadcasterEnabled = *c.LogBroadcasterEnabled + } + + for i, n := range c.Nodes { if n.SendOnly != nil && *n.SendOnly { continue } + hasPrimary = true - break + + // if the node is a primary node, then the WS URL is required when LogBroadcaster is enabled + if logBroadcasterEnabled && (n.WSURL == nil || n.WSURL.IsZero()) { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "Nodes", Msg: fmt.Sprintf("%vth node (primary) must have a valid WSURL when LogBroadcaster is enabled", i)}) + } } + if !hasPrimary { err = multierr.Append(err, commonconfig.ErrMissing{Name: "Nodes", - Msg: "must have at least one primary node with WSURL"}) + Msg: "must have at least one primary node"}) } } @@ -976,19 +989,8 @@ func (n *Node) ValidateConfig() (err error) { err = multierr.Append(err, commonconfig.ErrEmpty{Name: "Name", Msg: "required for all nodes"}) } - var sendOnly bool - if n.SendOnly != nil { - sendOnly = *n.SendOnly - } - if n.WSURL == nil { - if !sendOnly { - err = multierr.Append(err, commonconfig.ErrMissing{Name: "WSURL", Msg: "required for primary nodes"}) - } - } else if n.WSURL.IsZero() { - if !sendOnly { - err = multierr.Append(err, commonconfig.ErrEmpty{Name: "WSURL", Msg: "required for primary nodes"}) - } - } else { + // relax the check here as WSURL can potentially be empty if LogBroadcaster is disabled (checked in EVMConfig Validation) + if n.WSURL != nil && !n.WSURL.IsZero() { switch n.WSURL.Scheme { case "ws", "wss": default: diff --git a/core/config/docs/chains-evm.toml b/core/config/docs/chains-evm.toml index 660b3e4a7dd..c237de5826d 100644 --- a/core/config/docs/chains-evm.toml +++ b/core/config/docs/chains-evm.toml @@ -473,7 +473,7 @@ ObservationGracePeriod = '1s' # Default [[EVM.Nodes]] # Name is a unique (per-chain) identifier for this node. Name = 'foo' # Example -# WSURL is the WS(S) endpoint for this node. Required for primary nodes. +# WSURL is the WS(S) endpoint for this node. Required for primary nodes when `LogBroadcasterEnabled` is `true` WSURL = 'wss://web.socket/test' # Example # HTTPURL is the HTTP(S) endpoint for this node. Required for all nodes. HTTPURL = 'https://foo.web' # Example diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 375884d8fea..64cc58588a8 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -1348,7 +1348,8 @@ func TestConfig_Validate(t *testing.T) { - 1.ChainID: invalid value (1): duplicate - must be unique - 0.Nodes.1.Name: invalid value (foo): duplicate - must be unique - 3.Nodes.4.WSURL: invalid value (ws://dupe.com): duplicate - must be unique - - 0: 3 errors: + - 0: 4 errors: + - Nodes: missing: 0th node (primary) must have a valid WSURL when LogBroadcaster is enabled - GasEstimator.BumpTxDepth: invalid value (11): must be less than or equal to Transactions.MaxInFlight - GasEstimator: 6 errors: - BumpPercent: invalid value (1): may not be less than Geth's default of 10 @@ -1358,9 +1359,7 @@ func TestConfig_Validate(t *testing.T) { - PriceMax: invalid value (10 gwei): must be greater than or equal to PriceDefault - BlockHistory.BlockHistorySize: invalid value (0): must be greater than or equal to 1 with BlockHistory Mode - Nodes: 2 errors: - - 0: 2 errors: - - WSURL: missing: required for primary nodes - - HTTPURL: missing: required for all nodes + - 0.HTTPURL: missing: required for all nodes - 1.HTTPURL: missing: required for all nodes - 1: 10 errors: - ChainType: invalid value (Foo): must not be set with this chain id @@ -1381,18 +1380,19 @@ func TestConfig_Validate(t *testing.T) { - ChainType: invalid value (Arbitrum): must be one of arbitrum, astar, celo, gnosis, hedera, kroma, mantle, metis, optimismBedrock, scroll, wemix, xlayer, zkevm, zksync or omitted - FinalityDepth: invalid value (0): must be greater than or equal to 1 - MinIncomingConfirmations: invalid value (0): must be greater than or equal to 1 - - 3.Nodes: 5 errors: - - 0: 3 errors: + - 3: 3 errors: + - Nodes: missing: 0th node (primary) must have a valid WSURL when LogBroadcaster is enabled + - Nodes: missing: 2th node (primary) must have a valid WSURL when LogBroadcaster is enabled + - Nodes: 5 errors: + - 0: 2 errors: - Name: missing: required for all nodes - - WSURL: missing: required for primary nodes - HTTPURL: empty: required for all nodes - 1: 3 errors: - Name: missing: required for all nodes - WSURL: invalid value (http): must be ws or wss - HTTPURL: missing: required for all nodes - - 2: 3 errors: + - 2: 2 errors: - Name: empty: required for all nodes - - WSURL: missing: required for primary nodes - HTTPURL: invalid value (ws): must be http or https - 3.HTTPURL: missing: required for all nodes - 4.HTTPURL: missing: required for all nodes diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 79fe03d5e37..498e2ae44f6 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -9759,7 +9759,7 @@ Name is a unique (per-chain) identifier for this node. ```toml WSURL = 'wss://web.socket/test' # Example ``` -WSURL is the WS(S) endpoint for this node. Required for primary nodes. +WSURL is the WS(S) endpoint for this node. Required for primary nodes when `LogBroadcasterEnabled` is `true` ### HTTPURL ```toml