From 5ec3d5a1762e3dabc287fdd0d535306c9b38d278 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 1 Nov 2024 14:54:20 +0300 Subject: [PATCH] morph: drop inactive client mode, fix #2797 Reconnect indefinitely. After the initial probe this should be the behavior for client, we can not work without blockchain data so we must wait for it as long as needed, not entering any "inactive" state where the node pretends to be alive, but can't do anything. This makes client context (including everything belonging to it) an atomically updated thing, regular calls fetch it when needed and can break if there are no current connection. Meanwhile notification processing thread can reconnect when needed and restore functionality. Signed-off-by: Roman Khimov --- CHANGELOG.md | 1 + cmd/neofs-node/config.go | 3 - pkg/innerring/innerring.go | 3 - pkg/morph/client/client.go | 238 +++++++++++------------------- pkg/morph/client/connection.go | 86 +++++++++++ pkg/morph/client/constructor.go | 160 ++++++++------------ pkg/morph/client/multi.go | 59 +++----- pkg/morph/client/nns.go | 21 ++- pkg/morph/client/notary.go | 99 +++++-------- pkg/morph/client/notifications.go | 136 ++++++----------- 10 files changed, 348 insertions(+), 458 deletions(-) create mode 100644 pkg/morph/client/connection.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 164598ca07..4da77cd2d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ attribute, which is used for container domain name in NNS contracts (#2954) - Structure table in the SN-configuration document (#2974) - False negative connection to NeoFS chain in multi-endpoint setup with at least one live node (#2986) - Overriding the default container and object attributes only with the appropriate flags (#2985) +- RPC client reconnection failures leading to complete SN failure (#2797) ### Changed - `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 68fe7ef41a..76d762ad92 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -668,9 +668,6 @@ func initBasics(c *cfg, key *keys.PrivateKey, stateStorage *state.PersistentStor c.internalErr <- fmt.Errorf("restarting after morph connection was lost: %w", err) } }), - client.WithConnLostCallback(func() { - c.internalErr <- errors.New("morph connection has been lost") - }), client.WithMinRequiredBlockHeight(fromSideChainBlock), ) if err != nil { diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 831c463007..3b4565adcf 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -1077,9 +1077,6 @@ func (s *Server) createClient(ctx context.Context, p chainParams, errChan chan<- errChan <- fmt.Errorf("internal services' restart after RPC reconnection to the %s: %w", p.name, err) } }), - client.WithConnLostCallback(func() { - errChan <- fmt.Errorf("%s chain connection has been lost", p.name) - }), client.WithMinRequiredBlockHeight(p.from), } if p.withAutoSidechainScope { diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 797ec7a5ad..793fafc641 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "sync" + "sync/atomic" "github.com/google/uuid" lru "github.com/hashicorp/golang-lru/v2" @@ -16,12 +17,8 @@ import ( "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/network/payload" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" - "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" - "github.com/nspcc-dev/neo-go/pkg/rpcclient/gas" "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" "github.com/nspcc-dev/neo-go/pkg/rpcclient/neo" - "github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17" "github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt" "github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap" "github.com/nspcc-dev/neo-go/pkg/smartcontract" @@ -37,11 +34,9 @@ import ( // that provides smart-contract invocation interface // and notification subscription functionality. // -// On connection lost tries establishing new connection -// to the next RPC (if any). If no RPC node available, -// switches to inactive mode: any RPC call leads to immediate -// return with ErrConnectionLost error, every notification -// consumer passed to any Receive* method is closed. +// On connection loss tries establishing new connection +// to the next RPC (if any). While doing that any RPC call +// leads to immediate return with ErrConnectionLost error. // // Working client must be created via constructor New. // Using the Client that has been created with new(Client) @@ -52,10 +47,7 @@ type Client struct { logger *zap.Logger // logging component - client *rpcclient.WSClient // neo-go websocket client - rpcActor *actor.Actor // neo-go RPC actor - gasToken *nep17.Token // neo-go GAS token wrapper - rolemgmt *rolemgmt.Contract // neo-go Designation contract wrapper + conn atomic.Pointer[connection] acc *wallet.Account // neo account accAddr util.Uint160 // account's address @@ -66,64 +58,46 @@ type Client struct { endpoints []string - // switchLock protects endpoints, inactive, and subscription-related fields. - // It is taken exclusively during endpoint switch and locked in shared mode - // on every normal call. - switchLock *sync.RWMutex - - // notification consumers (Client sends - // notifications to these channels) - notifyChan chan *state.ContainedNotificationEvent - blockChan chan *block.Block - notaryChan chan *result.NotaryRequestEvent - subs subscriptions + subs subscriptions // channel for internal stop closeChan chan struct{} - - // indicates that Client is not able to - // establish connection to any of the - // provided RPC endpoints - inactive bool } // Call calls specified method of the Neo smart contract with provided arguments. func (c *Client) Call(contract util.Uint160, method string, args ...any) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.rpcActor.Call(contract, method, args...) + return conn.rpcActor.Call(contract, method, args...) } // CallAndExpandIterator calls specified iterating method of the Neo smart // contract with provided arguments, and fetches iterator from the response // carrying up to limited number of items. func (c *Client) CallAndExpandIterator(contract util.Uint160, method string, maxItems int, args ...any) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.rpcActor.CallAndExpandIterator(contract, method, maxItems, args...) + return conn.rpcActor.CallAndExpandIterator(contract, method, maxItems, args...) } // TerminateSession closes opened session by its ID on the currently active Neo // RPC node the Client connected to. Returns true even if session was not found. func (c *Client) TerminateSession(sessionID uuid.UUID) (bool, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return false, ErrConnectionLost } - _, err := c.client.TerminateSession(sessionID) + _, err := conn.client.TerminateSession(sessionID) return true, err } @@ -132,68 +106,63 @@ func (c *Client) TerminateSession(sessionID uuid.UUID) (bool, error) { // Neo RPC node the Client connected to. Returns empty result if either there is // no more elements or session is closed. func (c *Client) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.TraverseIterator(sessionID, iteratorID, maxItemsCount) + return conn.client.TraverseIterator(sessionID, iteratorID, maxItemsCount) } // InvokeContractVerify calls 'verify' method of the referenced Neo smart // contract deployed in the blockchain the Client connected to and returns the // call result. func (c *Client) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.InvokeContractVerify(contract, params, signers, witnesses...) + return conn.client.InvokeContractVerify(contract, params, signers, witnesses...) } // InvokeFunction calls specified method of the referenced Neo smart contract // deployed in the blockchain the Client connected to and returns the call // result. func (c *Client) InvokeFunction(contract util.Uint160, operation string, params []smartcontract.Parameter, signers []transaction.Signer) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.InvokeFunction(contract, operation, params, signers) + return conn.client.InvokeFunction(contract, operation, params, signers) } // InvokeScript tests given script on the Neo blockchain the Client connected to // and returns the script result. func (c *Client) InvokeScript(script []byte, signers []transaction.Signer) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.InvokeScript(script, signers) + return conn.client.InvokeScript(script, signers) } // CalculateNetworkFee calculates consensus nodes' fee for given transaction in // the blockchain the Client connected to. func (c *Client) CalculateNetworkFee(tx *transaction.Transaction) (int64, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - return c.client.CalculateNetworkFee(tx) + return conn.client.CalculateNetworkFee(tx) } // GetBlockCount returns current height of the Neo blockchain the Client @@ -205,41 +174,38 @@ func (c *Client) GetBlockCount() (uint32, error) { // GetVersion returns local settings of the currently active Neo RPC node the // Client connected to. func (c *Client) GetVersion() (*result.Version, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.GetVersion() + return conn.client.GetVersion() } // SendRawTransaction sends specified transaction to the Neo blockchain the // Client connected to and returns the transaction hash. func (c *Client) SendRawTransaction(tx *transaction.Transaction) (util.Uint256, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return util.Uint256{}, ErrConnectionLost } - return c.client.SendRawTransaction(tx) + return conn.client.SendRawTransaction(tx) } // SubmitP2PNotaryRequest submits given Notary service request to the Neo // blockchain the Client connected to and returns the fallback transaction's // hash. func (c *Client) SubmitP2PNotaryRequest(req *payload.P2PNotaryRequest) (util.Uint256, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return util.Uint256{}, ErrConnectionLost } - return c.client.SubmitP2PNotaryRequest(req) + return conn.client.SubmitP2PNotaryRequest(req) } // GetCommittee returns current public keys of the committee of the Neo @@ -252,28 +218,26 @@ func (c *Client) GetCommittee() (keys.PublicKeys, error) { // contract deployed in the blockchain the Client connected to. Returns // [neorpc.ErrUnknownContract] if requested contract is missing. func (c *Client) GetContractStateByID(id int32) (*state.Contract, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.GetContractStateByID(id) + return conn.client.GetContractStateByID(id) } // GetContractStateByHash returns current state of the addressed Neo smart // contract deployed in the blockchain the Client connected to. Returns // [neorpc.ErrUnknownContract] if requested contract is missing. func (c *Client) GetContractStateByHash(addr util.Uint160) (*state.Contract, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.GetContractStateByHash(addr) + return conn.client.GetContractStateByHash(addr) } type cache struct { @@ -333,14 +297,13 @@ func (e *notHaltStateError) Error() string { // Invoke invokes contract method by sending transaction into blockchain. // Supported args types: int64, string, util.Uint160, []byte and bool. func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...any) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - txHash, vub, err := c.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...) + txHash, vub, err := conn.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...) if err != nil { return fmt.Errorf("could not invoke %s: %w", method, err) } @@ -373,10 +336,9 @@ func (c *Client) TestInvoke(contract util.Uint160, method string, args ...any) ( // If prefetchElements > 0, that many elements are tried to be placed on stack without // additional network communication (without the iterator expansion). func (c *Client) TestInvokeIterator(contract util.Uint160, method string, prefetchElements int, args ...any) (res []stackitem.Item, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } @@ -390,12 +352,12 @@ func (c *Client) TestInvokeIterator(contract util.Uint160, method string, prefet return nil, fmt.Errorf("building prefetching script: %w", err) } - items, sid, iter, err = unwrap.ArrayAndSessionIterator(c.rpcActor.Run(script)) + items, sid, iter, err = unwrap.ArrayAndSessionIterator(conn.rpcActor.Run(script)) if err != nil { return nil, fmt.Errorf("iterator expansion: %w", err) } } else { - sid, iter, err = unwrap.SessionIterator(c.rpcActor.Call(contract, method, args...)) + sid, iter, err = unwrap.SessionIterator(conn.rpcActor.Call(contract, method, args...)) if err != nil { return nil, fmt.Errorf("iterator expansion: %w", err) } @@ -403,12 +365,12 @@ func (c *Client) TestInvokeIterator(contract util.Uint160, method string, prefet defer func() { if (sid != uuid.UUID{}) { - _ = c.rpcActor.TerminateSession(sid) + _ = conn.rpcActor.TerminateSession(sid) } }() for { - ii, err := c.rpcActor.TraverseIterator(sid, &iter, config.DefaultMaxIteratorResultItems) + ii, err := conn.rpcActor.TraverseIterator(sid, &iter, config.DefaultMaxIteratorResultItems) if err != nil { return nil, fmt.Errorf("iterator traversal; session: %s, error: %w", sid, err) } @@ -425,14 +387,13 @@ func (c *Client) TestInvokeIterator(contract util.Uint160, method string, prefet // TransferGas to the receiver from local wallet. func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - txHash, vub, err := c.gasToken.Transfer(c.accAddr, receiver, big.NewInt(int64(amount)), nil) + txHash, vub, err := conn.gasToken.Transfer(c.accAddr, receiver, big.NewInt(int64(amount)), nil) if err != nil { return err } @@ -447,14 +408,13 @@ func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error // GasBalance returns GAS amount in the client's wallet. func (c *Client) GasBalance() (res int64, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - bal, err := c.gasToken.BalanceOf(c.accAddr) + bal, err := conn.gasToken.BalanceOf(c.accAddr) if err != nil { return 0, err } @@ -464,27 +424,25 @@ func (c *Client) GasBalance() (res int64, err error) { // Committee returns keys of chain committee from neo native contract. func (c *Client) Committee() (res keys.PublicKeys, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.GetCommittee() + return conn.client.GetCommittee() } // TxHalt returns true if transaction has been successfully executed and persisted. func (c *Client) TxHalt(h util.Uint256) (res bool, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return false, ErrConnectionLost } trig := trigger.Application - aer, err := c.client.GetApplicationLog(h, &trig) + aer, err := conn.client.GetApplicationLog(h, &trig) if err != nil { return false, err } @@ -493,28 +451,26 @@ func (c *Client) TxHalt(h util.Uint256) (res bool, err error) { // TxHeight returns true if transaction has been successfully executed and persisted. func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - return c.client.GetTransactionHeight(h) + return conn.client.GetTransactionHeight(h) } // NeoFSAlphabetList returns keys that stored in NeoFS Alphabet role. Main chain // stores alphabet node keys of inner ring there, however the sidechain stores both // alphabet and non alphabet node keys of inner ring. func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - list, err := c.roleList(noderoles.NeoFSAlphabet) + list, err := conn.roleList(noderoles.NeoFSAlphabet) if err != nil { return nil, fmt.Errorf("can't get alphabet nodes role list: %w", err) } @@ -527,65 +483,52 @@ func (c *Client) GetDesignateHash() util.Uint160 { return rolemgmt.Hash } -func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) { - height, err := c.rpcActor.GetBlockCount() - if err != nil { - return nil, fmt.Errorf("can't get chain height: %w", err) - } - - return c.rolemgmt.GetDesignatedByRole(r, height) -} - // MagicNumber returns the magic number of the network // to which the underlying RPC node client is connected. func (c *Client) MagicNumber() (uint32, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - return uint32(c.rpcActor.GetNetwork()), nil + return uint32(conn.rpcActor.GetNetwork()), nil } // BlockCount returns block count of the network // to which the underlying RPC node client is connected. func (c *Client) BlockCount() (res uint32, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - return c.rpcActor.GetBlockCount() + return conn.rpcActor.GetBlockCount() } // MsPerBlock returns MillisecondsPerBlock network parameter. func (c *Client) MsPerBlock() (res int64, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - v := c.rpcActor.GetVersion() + v := conn.rpcActor.GetVersion() return int64(v.Protocol.MillisecondsPerBlock), nil } // IsValidScript returns true if invocation script executes with HALT state. func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (bool, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return false, ErrConnectionLost } - res, err := c.client.InvokeScript(script, signers) + res, err := conn.client.InvokeScript(script, signers) if err != nil { return false, fmt.Errorf("invokeScript: %w", err) } @@ -597,14 +540,13 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (boo // tokens for. Nil key with no error is returned if the account has no NEO // or if the account hasn't voted for anyone. func (c *Client) AccountVote(addr util.Uint160) (*keys.PublicKey, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - neoReader := neo.NewReader(invoker.New(c.client, nil)) + neoReader := neo.NewReader(invoker.New(conn.client, nil)) accountState, err := neoReader.GetAccountState(addr) if err != nil { @@ -617,9 +559,3 @@ func (c *Client) AccountVote(addr util.Uint160) (*keys.PublicKey, error) { return accountState.VoteTo, nil } - -func (c *Client) setActor(act *actor.Actor) { - c.rpcActor = act - c.gasToken = gas.New(act) - c.rolemgmt = rolemgmt.New(act) -} diff --git a/pkg/morph/client/connection.go b/pkg/morph/client/connection.go new file mode 100644 index 0000000000..ab59a40931 --- /dev/null +++ b/pkg/morph/client/connection.go @@ -0,0 +1,86 @@ +package client + +import ( + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/native/noderoles" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt" +) + +// connection is a WSClient with associated actors and wrappers, it's valid as +// long as connection doesn't change. +type connection struct { + client *rpcclient.WSClient // neo-go websocket client + rpcActor *actor.Actor // neo-go RPC actor + gasToken *nep17.Token // neo-go GAS token wrapper + rolemgmt *rolemgmt.Contract // neo-go Designation contract wrapper + + // notification receivers (Client reads notifications from these channels) + notifyChan chan *state.ContainedNotificationEvent + blockChan chan *block.Block + notaryChan chan *result.NotaryRequestEvent +} + +func (c *connection) roleList(r noderoles.Role) (keys.PublicKeys, error) { + height, err := c.rpcActor.GetBlockCount() + if err != nil { + return nil, fmt.Errorf("can't get chain height: %w", err) + } + + return c.rolemgmt.GetDesignatedByRole(r, height) +} + +// reachedHeight checks if [Client] has least expected block height and +// returns error if it is not reached that height. +// This function is required to avoid connections to unsynced RPC nodes, because +// they can produce events from the past that should not be processed by +// NeoFS nodes. +func (c *connection) reachedHeight(startFrom uint32) error { + if startFrom == 0 { + return nil + } + + height, err := c.client.GetBlockCount() + if err != nil { + return fmt.Errorf("could not get block height: %w", err) + } + + if height < startFrom+1 { + return fmt.Errorf("%w: expected %d height, got %d count", ErrStaleNodes, startFrom, height) + } + + return nil +} + +func (c *connection) Close() { + _ = c.client.UnsubscribeAll() + c.client.Close() + + var notifyCh, blockCh, notaryCh = c.notifyChan, c.blockChan, c.notaryChan +drainloop: + for { + select { + case _, ok := <-notifyCh: + if !ok { + notifyCh = nil + } + case _, ok := <-blockCh: + if !ok { + blockCh = nil + } + case _, ok := <-notaryCh: + if !ok { + notaryCh = nil + } + default: + break drainloop + } + } +} diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 708e68558e..9586cbad34 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -15,6 +15,8 @@ import ( "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/gas" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "go.uber.org/zap" @@ -42,13 +44,11 @@ type cfg struct { singleCli *rpcclient.WSClient // neo-go client for single client mode - inactiveModeCb Callback - rpcSwitchCb Callback - minRequiredHeight uint32 reconnectionRetries int reconnectionDelay time.Duration + rpcSwitchCb Callback } const ( @@ -106,84 +106,67 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) { opt(cfg) } - cli := &Client{ - cache: newClientCache(), - logger: cfg.logger, - acc: acc, - accAddr: accAddr, - cfg: *cfg, - switchLock: &sync.RWMutex{}, - closeChan: make(chan struct{}), - notifyChan: make(chan *state.ContainedNotificationEvent), - blockChan: make(chan *block.Block), - notaryChan: make(chan *result.NotaryRequestEvent), - subs: subscriptions{ - subscribedEvents: make(map[util.Uint160]struct{}), - subscribedNotaryEvents: make(map[util.Uint160]struct{}), - subscribedToNewBlocks: false, - - curNotifyChan: make(chan *state.ContainedNotificationEvent), - curBlockChan: make(chan *block.Block), - curNotaryChan: make(chan *result.NotaryRequestEvent), - }, - } - - var err error - var act *actor.Actor + var ( + cli = &Client{ + cache: newClientCache(), + logger: cfg.logger, + acc: acc, + accAddr: accAddr, + cfg: *cfg, + closeChan: make(chan struct{}), + subs: subscriptions{ + notifyChan: make(chan *state.ContainedNotificationEvent), + blockChan: make(chan *block.Block), + notaryChan: make(chan *result.NotaryRequestEvent), + subscribedEvents: make(map[util.Uint160]struct{}), + subscribedNotaryEvents: make(map[util.Uint160]struct{}), + subscribedToNewBlocks: false, + }, + } + conn *connection + err error + ) if cfg.singleCli != nil { // return client in single RPC node mode that uses // predefined WS client // // in case of the closing web socket connection: // if extra endpoints were provided via options, - // they will be used in switch process, otherwise - // inactive mode will be enabled - cli.client = cfg.singleCli - - if cfg.autoSidechainScope { - err = autoSidechainScope(cfg.singleCli, cfg) - if err != nil { - return nil, fmt.Errorf("scope setup: %w", err) - } - } - act, err = newActor(cfg.singleCli, acc, *cfg) - if err != nil { - return nil, fmt.Errorf("could not create RPC actor: %w", err) - } + // they will be used in switch process + conn, err = cli.newConnectionWS(cfg.singleCli) } else { if len(cfg.endpoints) == 0 { return nil, errors.New("no endpoints were provided") } cli.endpoints = cfg.endpoints - for _, e := range cli.endpoints { - cli.client, act, err = cli.newCli(e) - if err != nil { - cli.logger.Warn("Neo RPC connection failure", zap.String("endpoint", e), zap.Error(err)) - continue - } - break - } - - if err != nil { - return nil, fmt.Errorf("could not create RPC client: %w", err) + conn = cli.connEndpoints() + if conn == nil { + err = errors.New("could not establish Neo RPC connection") } } - cli.setActor(act) + if err != nil { + return nil, err + } for range cfg.reconnectionRetries { - err = cli.reachedHeight(cfg.minRequiredHeight) - if !errors.Is(err, ErrStaleNodes) { - break + if conn != nil { + err = conn.reachedHeight(cfg.minRequiredHeight) + if !errors.Is(err, ErrStaleNodes) { + break + } + cli.logger.Info("outdated Neo RPC node", zap.String("endpoint", conn.client.Endpoint()), zap.Error(err)) + conn.Close() } - - if !cli.switchRPC() { - return nil, fmt.Errorf("%w: could not switch to the next node", err) + conn = cli.connEndpoints() + if conn == nil { + err = errors.New("can't establish connection to any Neo RPC node") } } if err != nil { return nil, err } + cli.conn.Store(conn) go cli.routeNotifications() go cli.closeWaiter() @@ -191,16 +174,22 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) { return cli, nil } -func (c *Client) newCli(endpoint string) (*rpcclient.WSClient, *actor.Actor, error) { +func (c *Client) newConnection(endpoint string) (*connection, error) { cli, err := rpcclient.NewWS(c.cfg.ctx, endpoint, rpcclient.WSOptions{ Options: rpcclient.Options{ DialTimeout: c.cfg.dialTimeout, }, }) if err != nil { - return nil, nil, fmt.Errorf("WS client creation: %w", err) + return nil, fmt.Errorf("WS client creation: %w", err) } + return c.newConnectionWS(cli) +} + +func (c *Client) newConnectionWS(cli *rpcclient.WSClient) (*connection, error) { + var err error + defer func() { if err != nil { cli.Close() @@ -209,21 +198,30 @@ func (c *Client) newCli(endpoint string) (*rpcclient.WSClient, *actor.Actor, err err = cli.Init() if err != nil { - return nil, nil, fmt.Errorf("WS client initialization: %w", err) + return nil, fmt.Errorf("WS client initialization: %w", err) } if c.cfg.autoSidechainScope { err = autoSidechainScope(cli, &c.cfg) if err != nil { - return nil, nil, fmt.Errorf("scope setup: %w", err) + return nil, fmt.Errorf("scope setup: %w", err) } } act, err := newActor(cli, c.acc, c.cfg) if err != nil { - return nil, nil, fmt.Errorf("RPC actor creation: %w", err) + return nil, fmt.Errorf("RPC actor creation: %w", err) } - return cli, act, nil + var conn = &connection{ + client: cli, + rpcActor: act, + gasToken: gas.New(act), + rolemgmt: rolemgmt.New(act), + notifyChan: make(chan *state.ContainedNotificationEvent), + blockChan: make(chan *block.Block), + notaryChan: make(chan *result.NotaryRequestEvent), + } + return conn, nil } func newActor(ws *rpcclient.WSClient, acc *wallet.Account, cfg cfg) (*actor.Actor, error) { @@ -330,16 +328,6 @@ func WithReconnectionsDelay(d time.Duration) Option { } } -// WithConnLostCallback return a client constructor option -// that specifies a callback that is called when Client -// unsuccessfully tried to connect to all the specified -// endpoints. -func WithConnLostCallback(cb Callback) Option { - return func(c *cfg) { - c.inactiveModeCb = cb - } -} - // WithConnSwitchCallback returns a client constructor option // that specifies a callback that is called when the Client // reconnected to a new RPC (from [WithEndpoints] list) @@ -369,25 +357,3 @@ func WithAutoSidechainScope() Option { c.autoSidechainScope = true } } - -// reachedHeight checks if [Client] has least expected block height and -// returns error if it is not reached that height. -// This function is required to avoid connections to unsynced RPC nodes, because -// they can produce events from the past that should not be processed by -// NeoFS nodes. -func (c *Client) reachedHeight(startFrom uint32) error { - if startFrom == 0 { - return nil - } - - height, err := c.BlockCount() - if err != nil { - return fmt.Errorf("could not get block height: %w", err) - } - - if height < startFrom+1 { - return fmt.Errorf("%w: expected %d height, got %d count", ErrStaleNodes, startFrom, height) - } - - return nil -} diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index f8e63a82c8..2e100c5f65 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -12,47 +12,38 @@ type Endpoint struct { Priority int } -// SwitchRPC performs reconnection and returns true if it was successful. -func (c *Client) SwitchRPC() bool { - c.switchLock.Lock() - - for range c.cfg.reconnectionRetries { - if c.switchRPC() { - c.switchLock.Unlock() +// SwitchRPC performs reconnection and returns new if it was successful. +func (c *Client) switchRPC() *connection { + var conn = c.conn.Swap(nil) + if conn != nil { + conn.Close() // Ensure it's completed and drained. + } + for { + conn = c.connEndpoints() + if conn != nil { + c.conn.Store(conn) if c.cfg.rpcSwitchCb != nil { c.cfg.rpcSwitchCb() } - return true + return conn } select { case <-time.After(c.cfg.reconnectionDelay): case <-c.closeChan: - c.switchLock.Unlock() - return false + return nil } } - - c.inactive = true - c.switchLock.Unlock() - - if c.cfg.inactiveModeCb != nil { - c.cfg.inactiveModeCb() - } - - return false } -func (c *Client) switchRPC() bool { - c.client.Close() - - // Iterate endpoints in the order of decreasing priority. +func (c *Client) connEndpoints() *connection { + // Iterate endpoints. for _, e := range c.endpoints { - cli, act, err := c.newCli(e) + conn, err := c.newConnection(e) if err != nil { - c.logger.Warn("could not establish connection to the switched RPC node", + c.logger.Warn("could not establish connection to RPC node", zap.String("endpoint", e), zap.Error(err), ) @@ -62,15 +53,12 @@ func (c *Client) switchRPC() bool { c.cache.invalidate() - c.logger.Info("connection to the new RPC node has been established", + c.logger.Info("connection to RPC node has been established", zap.String("endpoint", e)) - c.client = cli - c.setActor(act) - - return true + return conn } - return false + return nil } func (c *Client) closeWaiter() { @@ -78,11 +66,6 @@ func (c *Client) closeWaiter() { case <-c.cfg.ctx.Done(): case <-c.closeChan: } - _ = c.UnsubscribeAll() - c.close() -} - -// close closes notification channel and wrapped WS client. -func (c *Client) close() { - c.client.Close() + var conn = c.conn.Swap(nil) + conn.Close() } diff --git a/pkg/morph/client/nns.go b/pkg/morph/client/nns.go index 143b76f88a..5e0df2290c 100644 --- a/pkg/morph/client/nns.go +++ b/pkg/morph/client/nns.go @@ -44,10 +44,9 @@ func NNSAlphabetContractName(index int) string { // in NNS contract. // If script hash has not been found, returns ErrNNSRecordNotFound. func (c *Client) NNSContractAddress(name string) (sh util.Uint160, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return util.Uint160{}, ErrConnectionLost } @@ -56,24 +55,23 @@ func (c *Client) NNSContractAddress(name string) (sh util.Uint160, err error) { return util.Uint160{}, err } - var nnsReader = nns.NewReader(invoker.New(c.client, nil), nnsHash) + var nnsReader = nns.NewReader(invoker.New(conn.client, nil), nnsHash) return nnsReader.ResolveFSContract(name) } // NNSHash returns NNS contract hash. func (c *Client) NNSHash() (util.Uint160, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return util.Uint160{}, ErrConnectionLost } nnsHash := c.cache.nns() if nnsHash == nil { - h, err := nns.InferHash(c.client) + h, err := nns.InferHash(conn.client) if err != nil { return util.Uint160{}, fmt.Errorf("InferHash: %w", err) } @@ -88,14 +86,13 @@ func (c *Client) NNSHash() (util.Uint160, error) { // postpone Sidechain scope initialization when NNS contract is not yet ready // while Client is already needed. func (c *Client) InitSidechainScope() error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - return autoSidechainScope(c.client, &c.cfg) + return autoSidechainScope(conn.client, &c.cfg) } func autoSidechainScope(ws *rpcclient.WSClient, conf *cfg) error { diff --git a/pkg/morph/client/notary.go b/pkg/morph/client/notary.go index 6d4c8942c0..2601e2366b 100644 --- a/pkg/morph/client/notary.go +++ b/pkg/morph/client/notary.go @@ -78,10 +78,9 @@ func defaultNotaryConfig(c *Client) *notaryCfg { // ability for client to get alphabet keys from committee or provided source // and use proxy contract script hash to create tx for notary contract. func (c *Client) EnableNotarySupport(opts ...NotaryOption) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -121,14 +120,13 @@ func (c *Client) IsNotaryEnabled() bool { // ProbeNotary checks if native `Notary` contract is presented on chain. func (c *Client) ProbeNotary() (res bool) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return false } - _, err := c.client.GetContractStateByAddressOrName(nativenames.Notary) + _, err := conn.client.GetContractStateByAddressOrName(nativenames.Notary) return err == nil } @@ -140,10 +138,9 @@ func (c *Client) ProbeNotary() (res bool) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -151,7 +148,7 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) error { panic(notaryNotEnabledPanicMsg) } - bc, err := c.rpcActor.GetBlockCount() + bc, err := conn.rpcActor.GetBlockCount() if err != nil { return fmt.Errorf("can't get blockchain height: %w", err) } @@ -166,7 +163,7 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) error { till = currentTill } - return c.depositNotary(amount, till) + return c.depositNotary(conn, amount, till) } // DepositEndlessNotary calls notary deposit method. Unlike `DepositNotary`, @@ -176,10 +173,9 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) error { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -188,12 +184,12 @@ func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) error { } // till value refers to a block height and it is uint32 value in neo-go - return c.depositNotary(amount, math.MaxUint32) + return c.depositNotary(conn, amount, math.MaxUint32) } -func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) error { +func (c *Client) depositNotary(conn *connection, amount fixedn.Fixed8, till int64) error { acc := c.acc.ScriptHash() - txHash, vub, err := c.gasToken.Transfer( + txHash, vub, err := conn.gasToken.Transfer( c.accAddr, c.notary.notary, big.NewInt(int64(amount)), @@ -213,7 +209,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) error { return nil } - _, err = c.rpcActor.WaitSuccess(txHash, vub, nil) + _, err = conn.rpcActor.WaitSuccess(txHash, vub, nil) if err != nil { return fmt.Errorf("waiting for %s TX (%d vub) to be persisted: %w", txHash.StringLE(), vub, err) } @@ -232,10 +228,9 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) error { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) GetNotaryDeposit() (res int64, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } @@ -284,13 +279,6 @@ func (u *UpdateNotaryListPrm) SetHash(hash util.Uint256) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) UpdateNotaryList(prm UpdateNotaryListPrm) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - - if c.inactive { - return ErrConnectionLost - } - if c.notary == nil { panic(notaryNotEnabledPanicMsg) } @@ -332,13 +320,6 @@ func (u *UpdateAlphabetListPrm) SetHash(hash util.Uint256) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) UpdateNeoFSAlphabetList(prm UpdateAlphabetListPrm) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - - if c.inactive { - return ErrConnectionLost - } - if c.notary == nil { panic(notaryNotEnabledPanicMsg) } @@ -363,13 +344,6 @@ func (c *Client) UpdateNeoFSAlphabetList(prm UpdateAlphabetListPrm) error { // // `nonce` and `vub` are used only if notary is enabled. func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, nonce uint32, vub *uint32, method string, args ...any) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - - if c.inactive { - return ErrConnectionLost - } - if c.notary == nil { return c.Invoke(contract, fee, method, args...) } @@ -383,13 +357,6 @@ func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, nonce ui // // Considered to be used by non-IR nodes. func (c *Client) NotaryInvokeNotAlpha(contract util.Uint160, fee fixedn.Fixed8, method string, args ...any) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - - if c.inactive { - return ErrConnectionLost - } - if c.notary == nil { return c.Invoke(contract, fee, method, args...) } @@ -402,10 +369,9 @@ func (c *Client) NotaryInvokeNotAlpha(contract util.Uint160, fee fixedn.Fixed8, // NOTE: does not fallback to simple `Invoke()`. Expected to be used only for // TXs retrieved from the received notary requests. func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -471,7 +437,7 @@ func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error { }) } - nAct, err := notary.NewActor(c.client, s, c.acc) + nAct, err := notary.NewActor(conn.client, s, c.acc) if err != nil { return err } @@ -510,6 +476,12 @@ func (c *Client) notaryInvokeAsCommittee(method string, nonce, vub uint32, args } func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint160, nonce uint32, vub *uint32, method string, args ...any) error { + var conn = c.conn.Load() + + if conn == nil { + return ErrConnectionLost + } + alphabetList, err := c.notary.alphabetSource() // prepare arguments for test invocation if err != nil { return err @@ -520,7 +492,7 @@ func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint if vub != nil { until = *vub } else { - until, err = c.notaryTxValidationLimit() + until, err = c.notaryTxValidationLimit(conn) if err != nil { return err } @@ -531,7 +503,7 @@ func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint return err } - nAct, err := notary.NewActor(c.client, cosigners, c.acc) + nAct, err := notary.NewActor(conn.client, cosigners, c.acc) if err != nil { return err } @@ -631,8 +603,8 @@ func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedB return multisigAccount, nil } -func (c *Client) notaryTxValidationLimit() (uint32, error) { - bc, err := c.rpcActor.GetBlockCount() +func (c *Client) notaryTxValidationLimit(conn *connection) (uint32, error) { + bc, err := conn.rpcActor.GetBlockCount() if err != nil { return 0, fmt.Errorf("can't get current blockchain height: %w", err) } @@ -760,10 +732,9 @@ func CalculateNotaryDepositAmount(c *Client, gasMul, gasDiv int64) (fixedn.Fixed // CalculateNonceAndVUB calculates nonce and ValidUntilBlock values // based on transaction hash. func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint32, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, 0, ErrConnectionLost } @@ -773,7 +744,7 @@ func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint nonce = binary.LittleEndian.Uint32(hash.BytesLE()) - height, err := c.getTransactionHeight(hash) + height, err := c.getTransactionHeight(conn, hash) if err != nil { return 0, 0, fmt.Errorf("could not get transaction height: %w", err) } @@ -781,11 +752,11 @@ func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint return nonce, height + c.notary.txValidTime, nil } -func (c *Client) getTransactionHeight(h util.Uint256) (uint32, error) { +func (c *Client) getTransactionHeight(conn *connection, h util.Uint256) (uint32, error) { if rh, ok := c.cache.txHeights.Get(h); ok { return rh, nil } - height, err := c.client.GetTransactionHeight(h) + height, err := conn.client.GetTransactionHeight(h) if err != nil { return 0, err } diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index eea4da3843..ffb885f29f 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -32,10 +32,9 @@ func (c *Client) Close() { // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) ReceiveExecutionNotifications(contracts []util.Uint160) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -50,11 +49,11 @@ func (c *Client) ReceiveExecutionNotifications(contracts []util.Uint160) error { } // subscribe to contract notifications - id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.subs.curNotifyChan) + id, err := conn.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, conn.notifyChan) if err != nil { // if there is some error, undo all subscriptions and return error for _, id := range notifyIDs { - _ = c.client.Unsubscribe(id) + _ = conn.client.Unsubscribe(id) } return fmt.Errorf("contract events subscription RPC: %w", err) @@ -80,14 +79,13 @@ func (c *Client) ReceiveExecutionNotifications(contracts []util.Uint160) error { // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) ReceiveBlocks() error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - _, err := c.client.ReceiveBlocks(nil, c.subs.curBlockChan) + _, err := conn.client.ReceiveBlocks(nil, conn.blockChan) if err != nil { return fmt.Errorf("block subscriptions RPC: %w", err) } @@ -114,10 +112,9 @@ func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160) error { panic(notaryNotEnabledPanicMsg) } - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -135,7 +132,7 @@ func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160) error { nrAddedType := mempoolevent.TransactionAdded filter := &neorpc.NotaryRequestFilter{Signer: &txSigner, Type: &nrAddedType} - _, err := c.client.ReceiveNotaryRequests(filter, c.subs.curNotaryChan) + _, err := conn.client.ReceiveNotaryRequests(filter, conn.notaryChan) if err != nil { return fmt.Errorf("subscribe to notary requests RPC: %w", err) } @@ -151,10 +148,9 @@ func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160) error { // // See also [Client.ReceiveNotaryRequests]. func (c *Client) ReceiveAllNotaryRequests() error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -165,7 +161,7 @@ func (c *Client) ReceiveAllNotaryRequests() error { return nil } - _, err := c.client.ReceiveNotaryRequests(nil, c.subs.curNotaryChan) + _, err := conn.client.ReceiveNotaryRequests(nil, conn.notaryChan) if err != nil { return fmt.Errorf("subscribe to notary requests RPC: %w", err) } @@ -182,14 +178,13 @@ func (c *Client) ReceiveAllNotaryRequests() error { // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) UnsubscribeAll() error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - err := c.client.UnsubscribeAll() + err := conn.client.UnsubscribeAll() if err != nil { return err } @@ -201,15 +196,15 @@ func (c *Client) UnsubscribeAll() error { // notification from the connected RPC node. // Channels are closed when connections to the RPC nodes are lost. func (c *Client) Notifications() (<-chan *state.ContainedNotificationEvent, <-chan *block.Block, <-chan *result.NotaryRequestEvent) { - return c.notifyChan, c.blockChan, c.notaryChan + return c.subs.notifyChan, c.subs.blockChan, c.subs.notaryChan } type subscriptions struct { - // notification receivers (Client reads - // notifications from these channels) - curNotifyChan chan *state.ContainedNotificationEvent - curBlockChan chan *block.Block - curNotaryChan chan *result.NotaryRequestEvent + // notification consumers (Client sends + // notifications to these channels) + notifyChan chan *state.ContainedNotificationEvent + blockChan chan *block.Block + notaryChan chan *result.NotaryRequestEvent sync.RWMutex // for subscription fields only @@ -223,90 +218,51 @@ type subscriptions struct { func (c *Client) routeNotifications() { var ( - restoreCh = make(chan bool) - restoreInProgress bool + restoreCh = make(chan bool) + conn = c.conn.Load() ) routeloop: for { + if conn == nil { + c.logger.Info("RPC connection lost, attempting reconnect") + conn = c.switchRPC() + go c.restoreSubscriptions(conn, restoreCh) + } var connLost bool - c.switchLock.RLock() - notifCh := c.subs.curNotifyChan - blCh := c.subs.curBlockChan - notaryCh := c.subs.curNotaryChan - c.switchLock.RUnlock() select { case <-c.closeChan: break routeloop - case ev, ok := <-notifCh: - connLost = handleEv(c.notifyChan, ok, ev) - case ev, ok := <-blCh: - connLost = handleEv(c.blockChan, ok, ev) - case ev, ok := <-notaryCh: - connLost = handleEv(c.notaryChan, ok, ev) + case ev, ok := <-conn.notifyChan: + connLost = handleEv(c.subs.notifyChan, ok, ev) + case ev, ok := <-conn.blockChan: + connLost = handleEv(c.subs.blockChan, ok, ev) + case ev, ok := <-conn.notaryChan: + connLost = handleEv(c.subs.notaryChan, ok, ev) case ok := <-restoreCh: - restoreInProgress = false if !ok { connLost = true } } if connLost { - if !restoreInProgress { - c.logger.Info("RPC connection lost, attempting reconnect") - if !c.SwitchRPC() { - c.logger.Error("can't switch RPC node") - break routeloop - } - - c.subs.Lock() - c.subs.curNotifyChan = make(chan *state.ContainedNotificationEvent) - c.subs.curBlockChan = make(chan *block.Block) - c.subs.curNotaryChan = make(chan *result.NotaryRequestEvent) - go c.restoreSubscriptions(c.subs.curNotifyChan, c.subs.curBlockChan, c.subs.curNotaryChan, restoreCh) - c.subs.Unlock() - restoreInProgress = true - drainloop: - for { - select { - case _, ok := <-notifCh: - if !ok { - notifCh = nil - } - case _, ok := <-blCh: - if !ok { - blCh = nil - } - case _, ok := <-notaryCh: - if !ok { - notaryCh = nil - } - default: - break drainloop - } - } - } else { // Avoid getting additional !ok eventc.subs. - c.subs.Lock() - c.subs.curNotifyChan = nil - c.subs.curBlockChan = nil - c.subs.curNotaryChan = nil - c.subs.Unlock() - } + conn = nil } } - close(c.notifyChan) - close(c.blockChan) - close(c.notaryChan) + close(c.subs.notifyChan) + close(c.subs.blockChan) + close(c.subs.notaryChan) } // restoreSubscriptions restores subscriptions according to // cached information about them. -func (c *Client) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent, - blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent, resCh chan<- bool) { +func (c *Client) restoreSubscriptions(conn *connection, resCh chan<- bool) { var err error + c.subs.RLock() + defer c.subs.RUnlock() // new block events restoration if c.subs.subscribedToNewBlocks { - _, err = c.client.ReceiveBlocks(nil, blCh) + _, err = conn.client.ReceiveBlocks(nil, conn.blockChan) if err != nil { c.logger.Error("could not restore block subscription", zap.Error(err), @@ -318,7 +274,7 @@ func (c *Client) restoreSubscriptions(notifCh chan<- *state.ContainedNotificatio // notification events restoration for contract := range c.subs.subscribedEvents { - _, err = c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notifCh) + _, err = conn.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, conn.notifyChan) if err != nil { c.logger.Error("could not restore notification subscription after RPC switch", zap.Error(err), @@ -330,7 +286,7 @@ func (c *Client) restoreSubscriptions(notifCh chan<- *state.ContainedNotificatio // notary notification events restoration if c.subs.subscribedToAllNotaryEvents { - _, err = c.client.ReceiveNotaryRequests(nil, notaryCh) + _, err = conn.client.ReceiveNotaryRequests(nil, conn.notaryChan) if err != nil { c.logger.Error("could not restore notary notification subscription after RPC switch", zap.Error(err)) @@ -343,7 +299,7 @@ func (c *Client) restoreSubscriptions(notifCh chan<- *state.ContainedNotificatio nrAddedType := mempoolevent.TransactionAdded filter := &neorpc.NotaryRequestFilter{Signer: &signer, Type: &nrAddedType} - _, err = c.client.ReceiveNotaryRequests(filter, notaryCh) + _, err = conn.client.ReceiveNotaryRequests(filter, conn.notaryChan) if err != nil { c.logger.Error("could not restore notary notification subscription after RPC switch", zap.Error(err),