diff --git a/price-feeder/CHANGELOG.md b/price-feeder/CHANGELOG.md index d77f44b70c..04b85ad8c6 100644 --- a/price-feeder/CHANGELOG.md +++ b/price-feeder/CHANGELOG.md @@ -49,6 +49,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ - [#978](https://github.com/umee-network/umee/pull/978) Cleanup the oracle package by moving deviation & conversion logic. - [#1050](https://github.com/umee-network/umee/pull/1050) Cache x/oracle params to decrease the number of queries to nodes. +- [#1069](https://github.com/umee-network/umee/pull/1069) Subscribe to node event EventNewBlockHeader to have the current chain height. ### Features diff --git a/price-feeder/oracle/client/chain_height.go b/price-feeder/oracle/client/chain_height.go new file mode 100644 index 0000000000..faa19e7f2d --- /dev/null +++ b/price-feeder/oracle/client/chain_height.go @@ -0,0 +1,106 @@ +package client + +import ( + "context" + "errors" + "sync" + + "github.com/rs/zerolog" + tmrpcclient "github.com/tendermint/tendermint/rpc/client" + tmctypes "github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/tendermint/types" +) + +var ( + errParseEventDataNewBlockHeader = errors.New("error parsing EventDataNewBlockHeader") + queryEventNewBlockHeader = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader) +) + +// ChainHeight is used to cache the chain height of the +// current node which is being updated each time the +// node sends an event of EventNewBlockHeader. +// It starts a goroutine to subscribe to blockchain new block event and update the cached height. +type ChainHeight struct { + Logger zerolog.Logger + + mtx sync.RWMutex + errGetChainHeight error + lastChainHeight int64 +} + +// NewChainHeight returns a new ChainHeight struct that +// starts a new goroutine subscribed to EventNewBlockHeader. +func NewChainHeight( + ctx context.Context, + rpcClient tmrpcclient.Client, + logger zerolog.Logger, +) (*ChainHeight, error) { + if !rpcClient.IsRunning() { + if err := rpcClient.Start(); err != nil { + return nil, err + } + } + + newBlockHeaderSubscription, err := rpcClient.Subscribe( + ctx, tmtypes.EventNewBlockHeader, queryEventNewBlockHeader.String()) + if err != nil { + return nil, err + } + + chainHeight := &ChainHeight{ + Logger: logger.With().Str("oracle_client", "chain_height").Logger(), + errGetChainHeight: nil, + lastChainHeight: 0, + } + + go chainHeight.subscribe(ctx, rpcClient, newBlockHeaderSubscription) + + return chainHeight, nil +} + +// updateChainHeight receives the data to be updated thread safe. +func (chainHeight *ChainHeight) updateChainHeight(blockHeight int64, err error) { + chainHeight.mtx.Lock() + defer chainHeight.mtx.Unlock() + + chainHeight.lastChainHeight = blockHeight + chainHeight.errGetChainHeight = err +} + +// subscribe listens to new blocks being made +// and updates the chain height. +func (chainHeight *ChainHeight) subscribe( + ctx context.Context, + eventsClient tmrpcclient.EventsClient, + newBlockHeaderSubscription <-chan tmctypes.ResultEvent, +) { + for { + select { + case <-ctx.Done(): + err := eventsClient.Unsubscribe(ctx, tmtypes.EventNewBlockHeader, queryEventNewBlockHeader.String()) + if err != nil { + chainHeight.Logger.Err(err) + chainHeight.updateChainHeight(chainHeight.lastChainHeight, err) + } + chainHeight.Logger.Info().Msg("closing the ChainHeight subscription") + return + + case resultEvent := <-newBlockHeaderSubscription: + eventDataNewBlockHeader, ok := resultEvent.Data.(tmtypes.EventDataNewBlockHeader) + if !ok { + chainHeight.Logger.Err(errParseEventDataNewBlockHeader) + chainHeight.updateChainHeight(chainHeight.lastChainHeight, errParseEventDataNewBlockHeader) + continue + } + chainHeight.updateChainHeight(eventDataNewBlockHeader.Header.Height, nil) + } + } +} + +// GetChainHeight returns the last chain height available. +func (chainHeight *ChainHeight) GetChainHeight() (int64, error) { + chainHeight.mtx.RLock() + defer chainHeight.mtx.RUnlock() + + return chainHeight.lastChainHeight, chainHeight.errGetChainHeight +} diff --git a/price-feeder/oracle/client/client.go b/price-feeder/oracle/client/client.go index bc3d2ffa93..c396356289 100644 --- a/price-feeder/oracle/client/client.go +++ b/price-feeder/oracle/client/client.go @@ -2,6 +2,7 @@ package client import ( "bytes" + "context" "errors" "fmt" "io" @@ -10,7 +11,6 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" - rpcClient "github.com/cosmos/cosmos-sdk/client/rpc" "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/crypto/keyring" sdk "github.com/cosmos/cosmos-sdk/types" @@ -44,6 +44,7 @@ type ( GasAdjustment float64 GRPCEndpoint string KeyringPassphrase string + ChainHeight *ChainHeight } passReader struct { @@ -70,7 +71,7 @@ func NewOracleClient( return OracleClient{}, err } - return OracleClient{ + oracleClient := OracleClient{ Logger: logger.With().Str("module", "oracle_client").Logger(), ChainID: chainID, KeyringBackend: keyringBackend, @@ -85,7 +86,20 @@ func NewOracleClient( Encoding: umeeapp.MakeEncodingConfig(), GasAdjustment: gasAdjustment, GRPCEndpoint: grpcEndpoint, - }, nil + } + + clientCtx, err := oracleClient.CreateClientContext() + if err != nil { + return OracleClient{}, err + } + + chainHeight, err := NewChainHeight(context.Background(), clientCtx.Client, oracleClient.Logger) + if err != nil { + return OracleClient{}, err + } + oracleClient.ChainHeight = chainHeight + + return oracleClient, nil } func newPassReader(pass string) io.Reader { @@ -125,7 +139,7 @@ func (oc OracleClient) BroadcastTx(nextBlockHeight, timeoutHeight int64, msgs .. // re-try voting until timeout for lastCheckHeight < maxBlockHeight { - latestBlockHeight, err := rpcClient.GetChainHeight(clientCtx) + latestBlockHeight, err := oc.ChainHeight.GetChainHeight() if err != nil { return err } @@ -159,6 +173,8 @@ func (oc OracleClient) BroadcastTx(nextBlockHeight, timeoutHeight int64, msgs .. Str("tx_hash", hash). Uint32("tx_code", code). Msg("failed to broadcast tx; retrying...") + + time.Sleep(time.Second * 1) continue } diff --git a/price-feeder/oracle/oracle.go b/price-feeder/oracle/oracle.go index b695099a96..491ee265ec 100644 --- a/price-feeder/oracle/oracle.go +++ b/price-feeder/oracle/oracle.go @@ -11,7 +11,6 @@ import ( "sync" "time" - rpcclient "github.com/cosmos/cosmos-sdk/client/rpc" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" @@ -443,12 +442,7 @@ func (o *Oracle) checkAcceptList(params oracletypes.Params) { func (o *Oracle) tick(ctx context.Context) error { o.logger.Debug().Msg("executing oracle tick") - clientCtx, err := o.oracleClient.CreateClientContext() - if err != nil { - return err - } - - blockHeight, err := rpcclient.GetChainHeight(clientCtx) + blockHeight, err := o.oracleClient.ChainHeight.GetChainHeight() if err != nil { return err } @@ -533,7 +527,7 @@ func (o *Oracle) tick(ctx context.Context) error { return err } - currentHeight, err := rpcclient.GetChainHeight(clientCtx) + currentHeight, err := o.oracleClient.ChainHeight.GetChainHeight() if err != nil { return err }