diff --git a/CHANGELOG.md b/CHANGELOG.md index 164598ca07..4da77cd2d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ attribute, which is used for container domain name in NNS contracts (#2954) - Structure table in the SN-configuration document (#2974) - False negative connection to NeoFS chain in multi-endpoint setup with at least one live node (#2986) - Overriding the default container and object attributes only with the appropriate flags (#2985) +- RPC client reconnection failures leading to complete SN failure (#2797) ### Changed - `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 68fe7ef41a..76d762ad92 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -668,9 +668,6 @@ func initBasics(c *cfg, key *keys.PrivateKey, stateStorage *state.PersistentStor c.internalErr <- fmt.Errorf("restarting after morph connection was lost: %w", err) } }), - client.WithConnLostCallback(func() { - c.internalErr <- errors.New("morph connection has been lost") - }), client.WithMinRequiredBlockHeight(fromSideChainBlock), ) if err != nil { diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 831c463007..3b4565adcf 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -1077,9 +1077,6 @@ func (s *Server) createClient(ctx context.Context, p chainParams, errChan chan<- errChan <- fmt.Errorf("internal services' restart after RPC reconnection to the %s: %w", p.name, err) } }), - client.WithConnLostCallback(func() { - errChan <- fmt.Errorf("%s chain connection has been lost", p.name) - }), client.WithMinRequiredBlockHeight(p.from), } if p.withAutoSidechainScope { diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 797ec7a5ad..793fafc641 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "sync" + "sync/atomic" "github.com/google/uuid" lru "github.com/hashicorp/golang-lru/v2" @@ -16,12 +17,8 @@ import ( "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/network/payload" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" - "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" - "github.com/nspcc-dev/neo-go/pkg/rpcclient/gas" "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" "github.com/nspcc-dev/neo-go/pkg/rpcclient/neo" - "github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17" "github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt" "github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap" "github.com/nspcc-dev/neo-go/pkg/smartcontract" @@ -37,11 +34,9 @@ import ( // that provides smart-contract invocation interface // and notification subscription functionality. // -// On connection lost tries establishing new connection -// to the next RPC (if any). If no RPC node available, -// switches to inactive mode: any RPC call leads to immediate -// return with ErrConnectionLost error, every notification -// consumer passed to any Receive* method is closed. +// On connection loss tries establishing new connection +// to the next RPC (if any). While doing that any RPC call +// leads to immediate return with ErrConnectionLost error. // // Working client must be created via constructor New. // Using the Client that has been created with new(Client) @@ -52,10 +47,7 @@ type Client struct { logger *zap.Logger // logging component - client *rpcclient.WSClient // neo-go websocket client - rpcActor *actor.Actor // neo-go RPC actor - gasToken *nep17.Token // neo-go GAS token wrapper - rolemgmt *rolemgmt.Contract // neo-go Designation contract wrapper + conn atomic.Pointer[connection] acc *wallet.Account // neo account accAddr util.Uint160 // account's address @@ -66,64 +58,46 @@ type Client struct { endpoints []string - // switchLock protects endpoints, inactive, and subscription-related fields. - // It is taken exclusively during endpoint switch and locked in shared mode - // on every normal call. - switchLock *sync.RWMutex - - // notification consumers (Client sends - // notifications to these channels) - notifyChan chan *state.ContainedNotificationEvent - blockChan chan *block.Block - notaryChan chan *result.NotaryRequestEvent - subs subscriptions + subs subscriptions // channel for internal stop closeChan chan struct{} - - // indicates that Client is not able to - // establish connection to any of the - // provided RPC endpoints - inactive bool } // Call calls specified method of the Neo smart contract with provided arguments. func (c *Client) Call(contract util.Uint160, method string, args ...any) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.rpcActor.Call(contract, method, args...) + return conn.rpcActor.Call(contract, method, args...) } // CallAndExpandIterator calls specified iterating method of the Neo smart // contract with provided arguments, and fetches iterator from the response // carrying up to limited number of items. func (c *Client) CallAndExpandIterator(contract util.Uint160, method string, maxItems int, args ...any) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.rpcActor.CallAndExpandIterator(contract, method, maxItems, args...) + return conn.rpcActor.CallAndExpandIterator(contract, method, maxItems, args...) } // TerminateSession closes opened session by its ID on the currently active Neo // RPC node the Client connected to. Returns true even if session was not found. func (c *Client) TerminateSession(sessionID uuid.UUID) (bool, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return false, ErrConnectionLost } - _, err := c.client.TerminateSession(sessionID) + _, err := conn.client.TerminateSession(sessionID) return true, err } @@ -132,68 +106,63 @@ func (c *Client) TerminateSession(sessionID uuid.UUID) (bool, error) { // Neo RPC node the Client connected to. Returns empty result if either there is // no more elements or session is closed. func (c *Client) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.TraverseIterator(sessionID, iteratorID, maxItemsCount) + return conn.client.TraverseIterator(sessionID, iteratorID, maxItemsCount) } // InvokeContractVerify calls 'verify' method of the referenced Neo smart // contract deployed in the blockchain the Client connected to and returns the // call result. func (c *Client) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.InvokeContractVerify(contract, params, signers, witnesses...) + return conn.client.InvokeContractVerify(contract, params, signers, witnesses...) } // InvokeFunction calls specified method of the referenced Neo smart contract // deployed in the blockchain the Client connected to and returns the call // result. func (c *Client) InvokeFunction(contract util.Uint160, operation string, params []smartcontract.Parameter, signers []transaction.Signer) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.InvokeFunction(contract, operation, params, signers) + return conn.client.InvokeFunction(contract, operation, params, signers) } // InvokeScript tests given script on the Neo blockchain the Client connected to // and returns the script result. func (c *Client) InvokeScript(script []byte, signers []transaction.Signer) (*result.Invoke, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.InvokeScript(script, signers) + return conn.client.InvokeScript(script, signers) } // CalculateNetworkFee calculates consensus nodes' fee for given transaction in // the blockchain the Client connected to. func (c *Client) CalculateNetworkFee(tx *transaction.Transaction) (int64, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - return c.client.CalculateNetworkFee(tx) + return conn.client.CalculateNetworkFee(tx) } // GetBlockCount returns current height of the Neo blockchain the Client @@ -205,41 +174,38 @@ func (c *Client) GetBlockCount() (uint32, error) { // GetVersion returns local settings of the currently active Neo RPC node the // Client connected to. func (c *Client) GetVersion() (*result.Version, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.GetVersion() + return conn.client.GetVersion() } // SendRawTransaction sends specified transaction to the Neo blockchain the // Client connected to and returns the transaction hash. func (c *Client) SendRawTransaction(tx *transaction.Transaction) (util.Uint256, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return util.Uint256{}, ErrConnectionLost } - return c.client.SendRawTransaction(tx) + return conn.client.SendRawTransaction(tx) } // SubmitP2PNotaryRequest submits given Notary service request to the Neo // blockchain the Client connected to and returns the fallback transaction's // hash. func (c *Client) SubmitP2PNotaryRequest(req *payload.P2PNotaryRequest) (util.Uint256, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return util.Uint256{}, ErrConnectionLost } - return c.client.SubmitP2PNotaryRequest(req) + return conn.client.SubmitP2PNotaryRequest(req) } // GetCommittee returns current public keys of the committee of the Neo @@ -252,28 +218,26 @@ func (c *Client) GetCommittee() (keys.PublicKeys, error) { // contract deployed in the blockchain the Client connected to. Returns // [neorpc.ErrUnknownContract] if requested contract is missing. func (c *Client) GetContractStateByID(id int32) (*state.Contract, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.GetContractStateByID(id) + return conn.client.GetContractStateByID(id) } // GetContractStateByHash returns current state of the addressed Neo smart // contract deployed in the blockchain the Client connected to. Returns // [neorpc.ErrUnknownContract] if requested contract is missing. func (c *Client) GetContractStateByHash(addr util.Uint160) (*state.Contract, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.GetContractStateByHash(addr) + return conn.client.GetContractStateByHash(addr) } type cache struct { @@ -333,14 +297,13 @@ func (e *notHaltStateError) Error() string { // Invoke invokes contract method by sending transaction into blockchain. // Supported args types: int64, string, util.Uint160, []byte and bool. func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...any) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - txHash, vub, err := c.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...) + txHash, vub, err := conn.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...) if err != nil { return fmt.Errorf("could not invoke %s: %w", method, err) } @@ -373,10 +336,9 @@ func (c *Client) TestInvoke(contract util.Uint160, method string, args ...any) ( // If prefetchElements > 0, that many elements are tried to be placed on stack without // additional network communication (without the iterator expansion). func (c *Client) TestInvokeIterator(contract util.Uint160, method string, prefetchElements int, args ...any) (res []stackitem.Item, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } @@ -390,12 +352,12 @@ func (c *Client) TestInvokeIterator(contract util.Uint160, method string, prefet return nil, fmt.Errorf("building prefetching script: %w", err) } - items, sid, iter, err = unwrap.ArrayAndSessionIterator(c.rpcActor.Run(script)) + items, sid, iter, err = unwrap.ArrayAndSessionIterator(conn.rpcActor.Run(script)) if err != nil { return nil, fmt.Errorf("iterator expansion: %w", err) } } else { - sid, iter, err = unwrap.SessionIterator(c.rpcActor.Call(contract, method, args...)) + sid, iter, err = unwrap.SessionIterator(conn.rpcActor.Call(contract, method, args...)) if err != nil { return nil, fmt.Errorf("iterator expansion: %w", err) } @@ -403,12 +365,12 @@ func (c *Client) TestInvokeIterator(contract util.Uint160, method string, prefet defer func() { if (sid != uuid.UUID{}) { - _ = c.rpcActor.TerminateSession(sid) + _ = conn.rpcActor.TerminateSession(sid) } }() for { - ii, err := c.rpcActor.TraverseIterator(sid, &iter, config.DefaultMaxIteratorResultItems) + ii, err := conn.rpcActor.TraverseIterator(sid, &iter, config.DefaultMaxIteratorResultItems) if err != nil { return nil, fmt.Errorf("iterator traversal; session: %s, error: %w", sid, err) } @@ -425,14 +387,13 @@ func (c *Client) TestInvokeIterator(contract util.Uint160, method string, prefet // TransferGas to the receiver from local wallet. func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - txHash, vub, err := c.gasToken.Transfer(c.accAddr, receiver, big.NewInt(int64(amount)), nil) + txHash, vub, err := conn.gasToken.Transfer(c.accAddr, receiver, big.NewInt(int64(amount)), nil) if err != nil { return err } @@ -447,14 +408,13 @@ func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error // GasBalance returns GAS amount in the client's wallet. func (c *Client) GasBalance() (res int64, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - bal, err := c.gasToken.BalanceOf(c.accAddr) + bal, err := conn.gasToken.BalanceOf(c.accAddr) if err != nil { return 0, err } @@ -464,27 +424,25 @@ func (c *Client) GasBalance() (res int64, err error) { // Committee returns keys of chain committee from neo native contract. func (c *Client) Committee() (res keys.PublicKeys, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - return c.client.GetCommittee() + return conn.client.GetCommittee() } // TxHalt returns true if transaction has been successfully executed and persisted. func (c *Client) TxHalt(h util.Uint256) (res bool, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return false, ErrConnectionLost } trig := trigger.Application - aer, err := c.client.GetApplicationLog(h, &trig) + aer, err := conn.client.GetApplicationLog(h, &trig) if err != nil { return false, err } @@ -493,28 +451,26 @@ func (c *Client) TxHalt(h util.Uint256) (res bool, err error) { // TxHeight returns true if transaction has been successfully executed and persisted. func (c *Client) TxHeight(h util.Uint256) (res uint32, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - return c.client.GetTransactionHeight(h) + return conn.client.GetTransactionHeight(h) } // NeoFSAlphabetList returns keys that stored in NeoFS Alphabet role. Main chain // stores alphabet node keys of inner ring there, however the sidechain stores both // alphabet and non alphabet node keys of inner ring. func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - list, err := c.roleList(noderoles.NeoFSAlphabet) + list, err := conn.roleList(noderoles.NeoFSAlphabet) if err != nil { return nil, fmt.Errorf("can't get alphabet nodes role list: %w", err) } @@ -527,65 +483,52 @@ func (c *Client) GetDesignateHash() util.Uint160 { return rolemgmt.Hash } -func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) { - height, err := c.rpcActor.GetBlockCount() - if err != nil { - return nil, fmt.Errorf("can't get chain height: %w", err) - } - - return c.rolemgmt.GetDesignatedByRole(r, height) -} - // MagicNumber returns the magic number of the network // to which the underlying RPC node client is connected. func (c *Client) MagicNumber() (uint32, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - return uint32(c.rpcActor.GetNetwork()), nil + return uint32(conn.rpcActor.GetNetwork()), nil } // BlockCount returns block count of the network // to which the underlying RPC node client is connected. func (c *Client) BlockCount() (res uint32, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - return c.rpcActor.GetBlockCount() + return conn.rpcActor.GetBlockCount() } // MsPerBlock returns MillisecondsPerBlock network parameter. func (c *Client) MsPerBlock() (res int64, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } - v := c.rpcActor.GetVersion() + v := conn.rpcActor.GetVersion() return int64(v.Protocol.MillisecondsPerBlock), nil } // IsValidScript returns true if invocation script executes with HALT state. func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (bool, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return false, ErrConnectionLost } - res, err := c.client.InvokeScript(script, signers) + res, err := conn.client.InvokeScript(script, signers) if err != nil { return false, fmt.Errorf("invokeScript: %w", err) } @@ -597,14 +540,13 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (boo // tokens for. Nil key with no error is returned if the account has no NEO // or if the account hasn't voted for anyone. func (c *Client) AccountVote(addr util.Uint160) (*keys.PublicKey, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return nil, ErrConnectionLost } - neoReader := neo.NewReader(invoker.New(c.client, nil)) + neoReader := neo.NewReader(invoker.New(conn.client, nil)) accountState, err := neoReader.GetAccountState(addr) if err != nil { @@ -617,9 +559,3 @@ func (c *Client) AccountVote(addr util.Uint160) (*keys.PublicKey, error) { return accountState.VoteTo, nil } - -func (c *Client) setActor(act *actor.Actor) { - c.rpcActor = act - c.gasToken = gas.New(act) - c.rolemgmt = rolemgmt.New(act) -} diff --git a/pkg/morph/client/connection.go b/pkg/morph/client/connection.go new file mode 100644 index 0000000000..ab59a40931 --- /dev/null +++ b/pkg/morph/client/connection.go @@ -0,0 +1,86 @@ +package client + +import ( + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/native/noderoles" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt" +) + +// connection is a WSClient with associated actors and wrappers, it's valid as +// long as connection doesn't change. +type connection struct { + client *rpcclient.WSClient // neo-go websocket client + rpcActor *actor.Actor // neo-go RPC actor + gasToken *nep17.Token // neo-go GAS token wrapper + rolemgmt *rolemgmt.Contract // neo-go Designation contract wrapper + + // notification receivers (Client reads notifications from these channels) + notifyChan chan *state.ContainedNotificationEvent + blockChan chan *block.Block + notaryChan chan *result.NotaryRequestEvent +} + +func (c *connection) roleList(r noderoles.Role) (keys.PublicKeys, error) { + height, err := c.rpcActor.GetBlockCount() + if err != nil { + return nil, fmt.Errorf("can't get chain height: %w", err) + } + + return c.rolemgmt.GetDesignatedByRole(r, height) +} + +// reachedHeight checks if [Client] has least expected block height and +// returns error if it is not reached that height. +// This function is required to avoid connections to unsynced RPC nodes, because +// they can produce events from the past that should not be processed by +// NeoFS nodes. +func (c *connection) reachedHeight(startFrom uint32) error { + if startFrom == 0 { + return nil + } + + height, err := c.client.GetBlockCount() + if err != nil { + return fmt.Errorf("could not get block height: %w", err) + } + + if height < startFrom+1 { + return fmt.Errorf("%w: expected %d height, got %d count", ErrStaleNodes, startFrom, height) + } + + return nil +} + +func (c *connection) Close() { + _ = c.client.UnsubscribeAll() + c.client.Close() + + var notifyCh, blockCh, notaryCh = c.notifyChan, c.blockChan, c.notaryChan +drainloop: + for { + select { + case _, ok := <-notifyCh: + if !ok { + notifyCh = nil + } + case _, ok := <-blockCh: + if !ok { + blockCh = nil + } + case _, ok := <-notaryCh: + if !ok { + notaryCh = nil + } + default: + break drainloop + } + } +} diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 708e68558e..9586cbad34 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -15,6 +15,8 @@ import ( "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/gas" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "go.uber.org/zap" @@ -42,13 +44,11 @@ type cfg struct { singleCli *rpcclient.WSClient // neo-go client for single client mode - inactiveModeCb Callback - rpcSwitchCb Callback - minRequiredHeight uint32 reconnectionRetries int reconnectionDelay time.Duration + rpcSwitchCb Callback } const ( @@ -106,84 +106,67 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) { opt(cfg) } - cli := &Client{ - cache: newClientCache(), - logger: cfg.logger, - acc: acc, - accAddr: accAddr, - cfg: *cfg, - switchLock: &sync.RWMutex{}, - closeChan: make(chan struct{}), - notifyChan: make(chan *state.ContainedNotificationEvent), - blockChan: make(chan *block.Block), - notaryChan: make(chan *result.NotaryRequestEvent), - subs: subscriptions{ - subscribedEvents: make(map[util.Uint160]struct{}), - subscribedNotaryEvents: make(map[util.Uint160]struct{}), - subscribedToNewBlocks: false, - - curNotifyChan: make(chan *state.ContainedNotificationEvent), - curBlockChan: make(chan *block.Block), - curNotaryChan: make(chan *result.NotaryRequestEvent), - }, - } - - var err error - var act *actor.Actor + var ( + cli = &Client{ + cache: newClientCache(), + logger: cfg.logger, + acc: acc, + accAddr: accAddr, + cfg: *cfg, + closeChan: make(chan struct{}), + subs: subscriptions{ + notifyChan: make(chan *state.ContainedNotificationEvent), + blockChan: make(chan *block.Block), + notaryChan: make(chan *result.NotaryRequestEvent), + subscribedEvents: make(map[util.Uint160]struct{}), + subscribedNotaryEvents: make(map[util.Uint160]struct{}), + subscribedToNewBlocks: false, + }, + } + conn *connection + err error + ) if cfg.singleCli != nil { // return client in single RPC node mode that uses // predefined WS client // // in case of the closing web socket connection: // if extra endpoints were provided via options, - // they will be used in switch process, otherwise - // inactive mode will be enabled - cli.client = cfg.singleCli - - if cfg.autoSidechainScope { - err = autoSidechainScope(cfg.singleCli, cfg) - if err != nil { - return nil, fmt.Errorf("scope setup: %w", err) - } - } - act, err = newActor(cfg.singleCli, acc, *cfg) - if err != nil { - return nil, fmt.Errorf("could not create RPC actor: %w", err) - } + // they will be used in switch process + conn, err = cli.newConnectionWS(cfg.singleCli) } else { if len(cfg.endpoints) == 0 { return nil, errors.New("no endpoints were provided") } cli.endpoints = cfg.endpoints - for _, e := range cli.endpoints { - cli.client, act, err = cli.newCli(e) - if err != nil { - cli.logger.Warn("Neo RPC connection failure", zap.String("endpoint", e), zap.Error(err)) - continue - } - break - } - - if err != nil { - return nil, fmt.Errorf("could not create RPC client: %w", err) + conn = cli.connEndpoints() + if conn == nil { + err = errors.New("could not establish Neo RPC connection") } } - cli.setActor(act) + if err != nil { + return nil, err + } for range cfg.reconnectionRetries { - err = cli.reachedHeight(cfg.minRequiredHeight) - if !errors.Is(err, ErrStaleNodes) { - break + if conn != nil { + err = conn.reachedHeight(cfg.minRequiredHeight) + if !errors.Is(err, ErrStaleNodes) { + break + } + cli.logger.Info("outdated Neo RPC node", zap.String("endpoint", conn.client.Endpoint()), zap.Error(err)) + conn.Close() } - - if !cli.switchRPC() { - return nil, fmt.Errorf("%w: could not switch to the next node", err) + conn = cli.connEndpoints() + if conn == nil { + err = errors.New("can't establish connection to any Neo RPC node") } } if err != nil { return nil, err } + cli.conn.Store(conn) go cli.routeNotifications() go cli.closeWaiter() @@ -191,16 +174,22 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) { return cli, nil } -func (c *Client) newCli(endpoint string) (*rpcclient.WSClient, *actor.Actor, error) { +func (c *Client) newConnection(endpoint string) (*connection, error) { cli, err := rpcclient.NewWS(c.cfg.ctx, endpoint, rpcclient.WSOptions{ Options: rpcclient.Options{ DialTimeout: c.cfg.dialTimeout, }, }) if err != nil { - return nil, nil, fmt.Errorf("WS client creation: %w", err) + return nil, fmt.Errorf("WS client creation: %w", err) } + return c.newConnectionWS(cli) +} + +func (c *Client) newConnectionWS(cli *rpcclient.WSClient) (*connection, error) { + var err error + defer func() { if err != nil { cli.Close() @@ -209,21 +198,30 @@ func (c *Client) newCli(endpoint string) (*rpcclient.WSClient, *actor.Actor, err err = cli.Init() if err != nil { - return nil, nil, fmt.Errorf("WS client initialization: %w", err) + return nil, fmt.Errorf("WS client initialization: %w", err) } if c.cfg.autoSidechainScope { err = autoSidechainScope(cli, &c.cfg) if err != nil { - return nil, nil, fmt.Errorf("scope setup: %w", err) + return nil, fmt.Errorf("scope setup: %w", err) } } act, err := newActor(cli, c.acc, c.cfg) if err != nil { - return nil, nil, fmt.Errorf("RPC actor creation: %w", err) + return nil, fmt.Errorf("RPC actor creation: %w", err) } - return cli, act, nil + var conn = &connection{ + client: cli, + rpcActor: act, + gasToken: gas.New(act), + rolemgmt: rolemgmt.New(act), + notifyChan: make(chan *state.ContainedNotificationEvent), + blockChan: make(chan *block.Block), + notaryChan: make(chan *result.NotaryRequestEvent), + } + return conn, nil } func newActor(ws *rpcclient.WSClient, acc *wallet.Account, cfg cfg) (*actor.Actor, error) { @@ -330,16 +328,6 @@ func WithReconnectionsDelay(d time.Duration) Option { } } -// WithConnLostCallback return a client constructor option -// that specifies a callback that is called when Client -// unsuccessfully tried to connect to all the specified -// endpoints. -func WithConnLostCallback(cb Callback) Option { - return func(c *cfg) { - c.inactiveModeCb = cb - } -} - // WithConnSwitchCallback returns a client constructor option // that specifies a callback that is called when the Client // reconnected to a new RPC (from [WithEndpoints] list) @@ -369,25 +357,3 @@ func WithAutoSidechainScope() Option { c.autoSidechainScope = true } } - -// reachedHeight checks if [Client] has least expected block height and -// returns error if it is not reached that height. -// This function is required to avoid connections to unsynced RPC nodes, because -// they can produce events from the past that should not be processed by -// NeoFS nodes. -func (c *Client) reachedHeight(startFrom uint32) error { - if startFrom == 0 { - return nil - } - - height, err := c.BlockCount() - if err != nil { - return fmt.Errorf("could not get block height: %w", err) - } - - if height < startFrom+1 { - return fmt.Errorf("%w: expected %d height, got %d count", ErrStaleNodes, startFrom, height) - } - - return nil -} diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index f8e63a82c8..2e100c5f65 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -12,47 +12,38 @@ type Endpoint struct { Priority int } -// SwitchRPC performs reconnection and returns true if it was successful. -func (c *Client) SwitchRPC() bool { - c.switchLock.Lock() - - for range c.cfg.reconnectionRetries { - if c.switchRPC() { - c.switchLock.Unlock() +// SwitchRPC performs reconnection and returns new if it was successful. +func (c *Client) switchRPC() *connection { + var conn = c.conn.Swap(nil) + if conn != nil { + conn.Close() // Ensure it's completed and drained. + } + for { + conn = c.connEndpoints() + if conn != nil { + c.conn.Store(conn) if c.cfg.rpcSwitchCb != nil { c.cfg.rpcSwitchCb() } - return true + return conn } select { case <-time.After(c.cfg.reconnectionDelay): case <-c.closeChan: - c.switchLock.Unlock() - return false + return nil } } - - c.inactive = true - c.switchLock.Unlock() - - if c.cfg.inactiveModeCb != nil { - c.cfg.inactiveModeCb() - } - - return false } -func (c *Client) switchRPC() bool { - c.client.Close() - - // Iterate endpoints in the order of decreasing priority. +func (c *Client) connEndpoints() *connection { + // Iterate endpoints. for _, e := range c.endpoints { - cli, act, err := c.newCli(e) + conn, err := c.newConnection(e) if err != nil { - c.logger.Warn("could not establish connection to the switched RPC node", + c.logger.Warn("could not establish connection to RPC node", zap.String("endpoint", e), zap.Error(err), ) @@ -62,15 +53,12 @@ func (c *Client) switchRPC() bool { c.cache.invalidate() - c.logger.Info("connection to the new RPC node has been established", + c.logger.Info("connection to RPC node has been established", zap.String("endpoint", e)) - c.client = cli - c.setActor(act) - - return true + return conn } - return false + return nil } func (c *Client) closeWaiter() { @@ -78,11 +66,6 @@ func (c *Client) closeWaiter() { case <-c.cfg.ctx.Done(): case <-c.closeChan: } - _ = c.UnsubscribeAll() - c.close() -} - -// close closes notification channel and wrapped WS client. -func (c *Client) close() { - c.client.Close() + var conn = c.conn.Swap(nil) + conn.Close() } diff --git a/pkg/morph/client/nns.go b/pkg/morph/client/nns.go index 143b76f88a..5e0df2290c 100644 --- a/pkg/morph/client/nns.go +++ b/pkg/morph/client/nns.go @@ -44,10 +44,9 @@ func NNSAlphabetContractName(index int) string { // in NNS contract. // If script hash has not been found, returns ErrNNSRecordNotFound. func (c *Client) NNSContractAddress(name string) (sh util.Uint160, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return util.Uint160{}, ErrConnectionLost } @@ -56,24 +55,23 @@ func (c *Client) NNSContractAddress(name string) (sh util.Uint160, err error) { return util.Uint160{}, err } - var nnsReader = nns.NewReader(invoker.New(c.client, nil), nnsHash) + var nnsReader = nns.NewReader(invoker.New(conn.client, nil), nnsHash) return nnsReader.ResolveFSContract(name) } // NNSHash returns NNS contract hash. func (c *Client) NNSHash() (util.Uint160, error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return util.Uint160{}, ErrConnectionLost } nnsHash := c.cache.nns() if nnsHash == nil { - h, err := nns.InferHash(c.client) + h, err := nns.InferHash(conn.client) if err != nil { return util.Uint160{}, fmt.Errorf("InferHash: %w", err) } @@ -88,14 +86,13 @@ func (c *Client) NNSHash() (util.Uint160, error) { // postpone Sidechain scope initialization when NNS contract is not yet ready // while Client is already needed. func (c *Client) InitSidechainScope() error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - return autoSidechainScope(c.client, &c.cfg) + return autoSidechainScope(conn.client, &c.cfg) } func autoSidechainScope(ws *rpcclient.WSClient, conf *cfg) error { diff --git a/pkg/morph/client/notary.go b/pkg/morph/client/notary.go index 6d4c8942c0..2601e2366b 100644 --- a/pkg/morph/client/notary.go +++ b/pkg/morph/client/notary.go @@ -78,10 +78,9 @@ func defaultNotaryConfig(c *Client) *notaryCfg { // ability for client to get alphabet keys from committee or provided source // and use proxy contract script hash to create tx for notary contract. func (c *Client) EnableNotarySupport(opts ...NotaryOption) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -121,14 +120,13 @@ func (c *Client) IsNotaryEnabled() bool { // ProbeNotary checks if native `Notary` contract is presented on chain. func (c *Client) ProbeNotary() (res bool) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return false } - _, err := c.client.GetContractStateByAddressOrName(nativenames.Notary) + _, err := conn.client.GetContractStateByAddressOrName(nativenames.Notary) return err == nil } @@ -140,10 +138,9 @@ func (c *Client) ProbeNotary() (res bool) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -151,7 +148,7 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) error { panic(notaryNotEnabledPanicMsg) } - bc, err := c.rpcActor.GetBlockCount() + bc, err := conn.rpcActor.GetBlockCount() if err != nil { return fmt.Errorf("can't get blockchain height: %w", err) } @@ -166,7 +163,7 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) error { till = currentTill } - return c.depositNotary(amount, till) + return c.depositNotary(conn, amount, till) } // DepositEndlessNotary calls notary deposit method. Unlike `DepositNotary`, @@ -176,10 +173,9 @@ func (c *Client) DepositNotary(amount fixedn.Fixed8, delta uint32) error { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -188,12 +184,12 @@ func (c *Client) DepositEndlessNotary(amount fixedn.Fixed8) error { } // till value refers to a block height and it is uint32 value in neo-go - return c.depositNotary(amount, math.MaxUint32) + return c.depositNotary(conn, amount, math.MaxUint32) } -func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) error { +func (c *Client) depositNotary(conn *connection, amount fixedn.Fixed8, till int64) error { acc := c.acc.ScriptHash() - txHash, vub, err := c.gasToken.Transfer( + txHash, vub, err := conn.gasToken.Transfer( c.accAddr, c.notary.notary, big.NewInt(int64(amount)), @@ -213,7 +209,7 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) error { return nil } - _, err = c.rpcActor.WaitSuccess(txHash, vub, nil) + _, err = conn.rpcActor.WaitSuccess(txHash, vub, nil) if err != nil { return fmt.Errorf("waiting for %s TX (%d vub) to be persisted: %w", txHash.StringLE(), vub, err) } @@ -232,10 +228,9 @@ func (c *Client) depositNotary(amount fixedn.Fixed8, till int64) error { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) GetNotaryDeposit() (res int64, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, ErrConnectionLost } @@ -284,13 +279,6 @@ func (u *UpdateNotaryListPrm) SetHash(hash util.Uint256) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) UpdateNotaryList(prm UpdateNotaryListPrm) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - - if c.inactive { - return ErrConnectionLost - } - if c.notary == nil { panic(notaryNotEnabledPanicMsg) } @@ -332,13 +320,6 @@ func (u *UpdateAlphabetListPrm) SetHash(hash util.Uint256) { // // This function must be invoked with notary enabled otherwise it throws panic. func (c *Client) UpdateNeoFSAlphabetList(prm UpdateAlphabetListPrm) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - - if c.inactive { - return ErrConnectionLost - } - if c.notary == nil { panic(notaryNotEnabledPanicMsg) } @@ -363,13 +344,6 @@ func (c *Client) UpdateNeoFSAlphabetList(prm UpdateAlphabetListPrm) error { // // `nonce` and `vub` are used only if notary is enabled. func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, nonce uint32, vub *uint32, method string, args ...any) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - - if c.inactive { - return ErrConnectionLost - } - if c.notary == nil { return c.Invoke(contract, fee, method, args...) } @@ -383,13 +357,6 @@ func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, nonce ui // // Considered to be used by non-IR nodes. func (c *Client) NotaryInvokeNotAlpha(contract util.Uint160, fee fixedn.Fixed8, method string, args ...any) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - - if c.inactive { - return ErrConnectionLost - } - if c.notary == nil { return c.Invoke(contract, fee, method, args...) } @@ -402,10 +369,9 @@ func (c *Client) NotaryInvokeNotAlpha(contract util.Uint160, fee fixedn.Fixed8, // NOTE: does not fallback to simple `Invoke()`. Expected to be used only for // TXs retrieved from the received notary requests. func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -471,7 +437,7 @@ func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error { }) } - nAct, err := notary.NewActor(c.client, s, c.acc) + nAct, err := notary.NewActor(conn.client, s, c.acc) if err != nil { return err } @@ -510,6 +476,12 @@ func (c *Client) notaryInvokeAsCommittee(method string, nonce, vub uint32, args } func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint160, nonce uint32, vub *uint32, method string, args ...any) error { + var conn = c.conn.Load() + + if conn == nil { + return ErrConnectionLost + } + alphabetList, err := c.notary.alphabetSource() // prepare arguments for test invocation if err != nil { return err @@ -520,7 +492,7 @@ func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint if vub != nil { until = *vub } else { - until, err = c.notaryTxValidationLimit() + until, err = c.notaryTxValidationLimit(conn) if err != nil { return err } @@ -531,7 +503,7 @@ func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint return err } - nAct, err := notary.NewActor(c.client, cosigners, c.acc) + nAct, err := notary.NewActor(conn.client, cosigners, c.acc) if err != nil { return err } @@ -631,8 +603,8 @@ func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedB return multisigAccount, nil } -func (c *Client) notaryTxValidationLimit() (uint32, error) { - bc, err := c.rpcActor.GetBlockCount() +func (c *Client) notaryTxValidationLimit(conn *connection) (uint32, error) { + bc, err := conn.rpcActor.GetBlockCount() if err != nil { return 0, fmt.Errorf("can't get current blockchain height: %w", err) } @@ -760,10 +732,9 @@ func CalculateNotaryDepositAmount(c *Client, gasMul, gasDiv int64) (fixedn.Fixed // CalculateNonceAndVUB calculates nonce and ValidUntilBlock values // based on transaction hash. func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint32, err error) { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return 0, 0, ErrConnectionLost } @@ -773,7 +744,7 @@ func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint nonce = binary.LittleEndian.Uint32(hash.BytesLE()) - height, err := c.getTransactionHeight(hash) + height, err := c.getTransactionHeight(conn, hash) if err != nil { return 0, 0, fmt.Errorf("could not get transaction height: %w", err) } @@ -781,11 +752,11 @@ func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint return nonce, height + c.notary.txValidTime, nil } -func (c *Client) getTransactionHeight(h util.Uint256) (uint32, error) { +func (c *Client) getTransactionHeight(conn *connection, h util.Uint256) (uint32, error) { if rh, ok := c.cache.txHeights.Get(h); ok { return rh, nil } - height, err := c.client.GetTransactionHeight(h) + height, err := conn.client.GetTransactionHeight(h) if err != nil { return 0, err } diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index eea4da3843..ffb885f29f 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -32,10 +32,9 @@ func (c *Client) Close() { // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) ReceiveExecutionNotifications(contracts []util.Uint160) error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -50,11 +49,11 @@ func (c *Client) ReceiveExecutionNotifications(contracts []util.Uint160) error { } // subscribe to contract notifications - id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.subs.curNotifyChan) + id, err := conn.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, conn.notifyChan) if err != nil { // if there is some error, undo all subscriptions and return error for _, id := range notifyIDs { - _ = c.client.Unsubscribe(id) + _ = conn.client.Unsubscribe(id) } return fmt.Errorf("contract events subscription RPC: %w", err) @@ -80,14 +79,13 @@ func (c *Client) ReceiveExecutionNotifications(contracts []util.Uint160) error { // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) ReceiveBlocks() error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - _, err := c.client.ReceiveBlocks(nil, c.subs.curBlockChan) + _, err := conn.client.ReceiveBlocks(nil, conn.blockChan) if err != nil { return fmt.Errorf("block subscriptions RPC: %w", err) } @@ -114,10 +112,9 @@ func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160) error { panic(notaryNotEnabledPanicMsg) } - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -135,7 +132,7 @@ func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160) error { nrAddedType := mempoolevent.TransactionAdded filter := &neorpc.NotaryRequestFilter{Signer: &txSigner, Type: &nrAddedType} - _, err := c.client.ReceiveNotaryRequests(filter, c.subs.curNotaryChan) + _, err := conn.client.ReceiveNotaryRequests(filter, conn.notaryChan) if err != nil { return fmt.Errorf("subscribe to notary requests RPC: %w", err) } @@ -151,10 +148,9 @@ func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160) error { // // See also [Client.ReceiveNotaryRequests]. func (c *Client) ReceiveAllNotaryRequests() error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } @@ -165,7 +161,7 @@ func (c *Client) ReceiveAllNotaryRequests() error { return nil } - _, err := c.client.ReceiveNotaryRequests(nil, c.subs.curNotaryChan) + _, err := conn.client.ReceiveNotaryRequests(nil, conn.notaryChan) if err != nil { return fmt.Errorf("subscribe to notary requests RPC: %w", err) } @@ -182,14 +178,13 @@ func (c *Client) ReceiveAllNotaryRequests() error { // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) UnsubscribeAll() error { - c.switchLock.RLock() - defer c.switchLock.RUnlock() + var conn = c.conn.Load() - if c.inactive { + if conn == nil { return ErrConnectionLost } - err := c.client.UnsubscribeAll() + err := conn.client.UnsubscribeAll() if err != nil { return err } @@ -201,15 +196,15 @@ func (c *Client) UnsubscribeAll() error { // notification from the connected RPC node. // Channels are closed when connections to the RPC nodes are lost. func (c *Client) Notifications() (<-chan *state.ContainedNotificationEvent, <-chan *block.Block, <-chan *result.NotaryRequestEvent) { - return c.notifyChan, c.blockChan, c.notaryChan + return c.subs.notifyChan, c.subs.blockChan, c.subs.notaryChan } type subscriptions struct { - // notification receivers (Client reads - // notifications from these channels) - curNotifyChan chan *state.ContainedNotificationEvent - curBlockChan chan *block.Block - curNotaryChan chan *result.NotaryRequestEvent + // notification consumers (Client sends + // notifications to these channels) + notifyChan chan *state.ContainedNotificationEvent + blockChan chan *block.Block + notaryChan chan *result.NotaryRequestEvent sync.RWMutex // for subscription fields only @@ -223,90 +218,51 @@ type subscriptions struct { func (c *Client) routeNotifications() { var ( - restoreCh = make(chan bool) - restoreInProgress bool + restoreCh = make(chan bool) + conn = c.conn.Load() ) routeloop: for { + if conn == nil { + c.logger.Info("RPC connection lost, attempting reconnect") + conn = c.switchRPC() + go c.restoreSubscriptions(conn, restoreCh) + } var connLost bool - c.switchLock.RLock() - notifCh := c.subs.curNotifyChan - blCh := c.subs.curBlockChan - notaryCh := c.subs.curNotaryChan - c.switchLock.RUnlock() select { case <-c.closeChan: break routeloop - case ev, ok := <-notifCh: - connLost = handleEv(c.notifyChan, ok, ev) - case ev, ok := <-blCh: - connLost = handleEv(c.blockChan, ok, ev) - case ev, ok := <-notaryCh: - connLost = handleEv(c.notaryChan, ok, ev) + case ev, ok := <-conn.notifyChan: + connLost = handleEv(c.subs.notifyChan, ok, ev) + case ev, ok := <-conn.blockChan: + connLost = handleEv(c.subs.blockChan, ok, ev) + case ev, ok := <-conn.notaryChan: + connLost = handleEv(c.subs.notaryChan, ok, ev) case ok := <-restoreCh: - restoreInProgress = false if !ok { connLost = true } } if connLost { - if !restoreInProgress { - c.logger.Info("RPC connection lost, attempting reconnect") - if !c.SwitchRPC() { - c.logger.Error("can't switch RPC node") - break routeloop - } - - c.subs.Lock() - c.subs.curNotifyChan = make(chan *state.ContainedNotificationEvent) - c.subs.curBlockChan = make(chan *block.Block) - c.subs.curNotaryChan = make(chan *result.NotaryRequestEvent) - go c.restoreSubscriptions(c.subs.curNotifyChan, c.subs.curBlockChan, c.subs.curNotaryChan, restoreCh) - c.subs.Unlock() - restoreInProgress = true - drainloop: - for { - select { - case _, ok := <-notifCh: - if !ok { - notifCh = nil - } - case _, ok := <-blCh: - if !ok { - blCh = nil - } - case _, ok := <-notaryCh: - if !ok { - notaryCh = nil - } - default: - break drainloop - } - } - } else { // Avoid getting additional !ok eventc.subs. - c.subs.Lock() - c.subs.curNotifyChan = nil - c.subs.curBlockChan = nil - c.subs.curNotaryChan = nil - c.subs.Unlock() - } + conn = nil } } - close(c.notifyChan) - close(c.blockChan) - close(c.notaryChan) + close(c.subs.notifyChan) + close(c.subs.blockChan) + close(c.subs.notaryChan) } // restoreSubscriptions restores subscriptions according to // cached information about them. -func (c *Client) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent, - blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent, resCh chan<- bool) { +func (c *Client) restoreSubscriptions(conn *connection, resCh chan<- bool) { var err error + c.subs.RLock() + defer c.subs.RUnlock() // new block events restoration if c.subs.subscribedToNewBlocks { - _, err = c.client.ReceiveBlocks(nil, blCh) + _, err = conn.client.ReceiveBlocks(nil, conn.blockChan) if err != nil { c.logger.Error("could not restore block subscription", zap.Error(err), @@ -318,7 +274,7 @@ func (c *Client) restoreSubscriptions(notifCh chan<- *state.ContainedNotificatio // notification events restoration for contract := range c.subs.subscribedEvents { - _, err = c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notifCh) + _, err = conn.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, conn.notifyChan) if err != nil { c.logger.Error("could not restore notification subscription after RPC switch", zap.Error(err), @@ -330,7 +286,7 @@ func (c *Client) restoreSubscriptions(notifCh chan<- *state.ContainedNotificatio // notary notification events restoration if c.subs.subscribedToAllNotaryEvents { - _, err = c.client.ReceiveNotaryRequests(nil, notaryCh) + _, err = conn.client.ReceiveNotaryRequests(nil, conn.notaryChan) if err != nil { c.logger.Error("could not restore notary notification subscription after RPC switch", zap.Error(err)) @@ -343,7 +299,7 @@ func (c *Client) restoreSubscriptions(notifCh chan<- *state.ContainedNotificatio nrAddedType := mempoolevent.TransactionAdded filter := &neorpc.NotaryRequestFilter{Signer: &signer, Type: &nrAddedType} - _, err = c.client.ReceiveNotaryRequests(filter, notaryCh) + _, err = conn.client.ReceiveNotaryRequests(filter, conn.notaryChan) if err != nil { c.logger.Error("could not restore notary notification subscription after RPC switch", zap.Error(err),