Skip to content

Commit

Permalink
feat: add cache for chain height (#1069) (#1081)
Browse files Browse the repository at this point in the history
## Description
- Helps to decrease the number of calls from price-feeder to node chain

---

### Author Checklist

_All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues._

I have...

- [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] added `!` to the type prefix if API or client breaking change
- [ ] added appropriate labels to the PR
- [ ] targeted the correct branch (see [PR Targeting](https://github.com/umee-network/umee/blob/main/CONTRIBUTING.md#pr-targeting))
- [ ] provided a link to the relevant issue or specification
- [ ] added a changelog entry to `CHANGELOG.md`
- [ ] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [ ] updated the relevant documentation or specification
- [ ] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed

### Reviewers Checklist

_All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items._

I have...

- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed all author checklist items have been addressed
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)

(cherry picked from commit 6430840)

Co-authored-by: Rafael Tenfen <rafaeltenfen.rt@gmail.com>
  • Loading branch information
mergify[bot] and RafilxTenfen authored Jun 30, 2022
1 parent 90b8ee3 commit cf4ea5c
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 12 deletions.
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

- [#978](https://github.com/umee-network/umee/pull/978) Cleanup the oracle package by moving deviation & conversion logic.
- [#1050](https://github.com/umee-network/umee/pull/1050) Cache x/oracle params to decrease the number of queries to nodes.
- [#1069](https://github.com/umee-network/umee/pull/1069) Subscribe to node event EventNewBlockHeader to have the current chain height.

### Features

Expand Down
106 changes: 106 additions & 0 deletions price-feeder/oracle/client/chain_height.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package client

import (
"context"
"errors"
"sync"

"github.com/rs/zerolog"
tmrpcclient "github.com/tendermint/tendermint/rpc/client"
tmctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
)

var (
errParseEventDataNewBlockHeader = errors.New("error parsing EventDataNewBlockHeader")
queryEventNewBlockHeader = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader)
)

// ChainHeight is used to cache the chain height of the
// current node which is being updated each time the
// node sends an event of EventNewBlockHeader.
// It starts a goroutine to subscribe to blockchain new block event and update the cached height.
type ChainHeight struct {
Logger zerolog.Logger

mtx sync.RWMutex
errGetChainHeight error
lastChainHeight int64
}

// NewChainHeight returns a new ChainHeight struct that
// starts a new goroutine subscribed to EventNewBlockHeader.
func NewChainHeight(
ctx context.Context,
rpcClient tmrpcclient.Client,
logger zerolog.Logger,
) (*ChainHeight, error) {
if !rpcClient.IsRunning() {
if err := rpcClient.Start(); err != nil {
return nil, err
}
}

newBlockHeaderSubscription, err := rpcClient.Subscribe(
ctx, tmtypes.EventNewBlockHeader, queryEventNewBlockHeader.String())
if err != nil {
return nil, err
}

chainHeight := &ChainHeight{
Logger: logger.With().Str("oracle_client", "chain_height").Logger(),
errGetChainHeight: nil,
lastChainHeight: 0,
}

go chainHeight.subscribe(ctx, rpcClient, newBlockHeaderSubscription)

return chainHeight, nil
}

// updateChainHeight receives the data to be updated thread safe.
func (chainHeight *ChainHeight) updateChainHeight(blockHeight int64, err error) {
chainHeight.mtx.Lock()
defer chainHeight.mtx.Unlock()

chainHeight.lastChainHeight = blockHeight
chainHeight.errGetChainHeight = err
}

// subscribe listens to new blocks being made
// and updates the chain height.
func (chainHeight *ChainHeight) subscribe(
ctx context.Context,
eventsClient tmrpcclient.EventsClient,
newBlockHeaderSubscription <-chan tmctypes.ResultEvent,
) {
for {
select {
case <-ctx.Done():
err := eventsClient.Unsubscribe(ctx, tmtypes.EventNewBlockHeader, queryEventNewBlockHeader.String())
if err != nil {
chainHeight.Logger.Err(err)
chainHeight.updateChainHeight(chainHeight.lastChainHeight, err)
}
chainHeight.Logger.Info().Msg("closing the ChainHeight subscription")
return

case resultEvent := <-newBlockHeaderSubscription:
eventDataNewBlockHeader, ok := resultEvent.Data.(tmtypes.EventDataNewBlockHeader)
if !ok {
chainHeight.Logger.Err(errParseEventDataNewBlockHeader)
chainHeight.updateChainHeight(chainHeight.lastChainHeight, errParseEventDataNewBlockHeader)
continue
}
chainHeight.updateChainHeight(eventDataNewBlockHeader.Header.Height, nil)
}
}
}

// GetChainHeight returns the last chain height available.
func (chainHeight *ChainHeight) GetChainHeight() (int64, error) {
chainHeight.mtx.RLock()
defer chainHeight.mtx.RUnlock()

return chainHeight.lastChainHeight, chainHeight.errGetChainHeight
}
24 changes: 20 additions & 4 deletions price-feeder/oracle/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -10,7 +11,6 @@ import (

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
rpcClient "github.com/cosmos/cosmos-sdk/client/rpc"
"github.com/cosmos/cosmos-sdk/client/tx"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -44,6 +44,7 @@ type (
GasAdjustment float64
GRPCEndpoint string
KeyringPassphrase string
ChainHeight *ChainHeight
}

passReader struct {
Expand All @@ -70,7 +71,7 @@ func NewOracleClient(
return OracleClient{}, err
}

return OracleClient{
oracleClient := OracleClient{
Logger: logger.With().Str("module", "oracle_client").Logger(),
ChainID: chainID,
KeyringBackend: keyringBackend,
Expand All @@ -85,7 +86,20 @@ func NewOracleClient(
Encoding: umeeapp.MakeEncodingConfig(),
GasAdjustment: gasAdjustment,
GRPCEndpoint: grpcEndpoint,
}, nil
}

clientCtx, err := oracleClient.CreateClientContext()
if err != nil {
return OracleClient{}, err
}

chainHeight, err := NewChainHeight(context.Background(), clientCtx.Client, oracleClient.Logger)
if err != nil {
return OracleClient{}, err
}
oracleClient.ChainHeight = chainHeight

return oracleClient, nil
}

func newPassReader(pass string) io.Reader {
Expand Down Expand Up @@ -125,7 +139,7 @@ func (oc OracleClient) BroadcastTx(nextBlockHeight, timeoutHeight int64, msgs ..

// re-try voting until timeout
for lastCheckHeight < maxBlockHeight {
latestBlockHeight, err := rpcClient.GetChainHeight(clientCtx)
latestBlockHeight, err := oc.ChainHeight.GetChainHeight()
if err != nil {
return err
}
Expand Down Expand Up @@ -159,6 +173,8 @@ func (oc OracleClient) BroadcastTx(nextBlockHeight, timeoutHeight int64, msgs ..
Str("tx_hash", hash).
Uint32("tx_code", code).
Msg("failed to broadcast tx; retrying...")

time.Sleep(time.Second * 1)
continue
}

Expand Down
10 changes: 2 additions & 8 deletions price-feeder/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

rpcclient "github.com/cosmos/cosmos-sdk/client/rpc"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -443,12 +442,7 @@ func (o *Oracle) checkAcceptList(params oracletypes.Params) {
func (o *Oracle) tick(ctx context.Context) error {
o.logger.Debug().Msg("executing oracle tick")

clientCtx, err := o.oracleClient.CreateClientContext()
if err != nil {
return err
}

blockHeight, err := rpcclient.GetChainHeight(clientCtx)
blockHeight, err := o.oracleClient.ChainHeight.GetChainHeight()
if err != nil {
return err
}
Expand Down Expand Up @@ -533,7 +527,7 @@ func (o *Oracle) tick(ctx context.Context) error {
return err
}

currentHeight, err := rpcclient.GetChainHeight(clientCtx)
currentHeight, err := o.oracleClient.ChainHeight.GetChainHeight()
if err != nil {
return err
}
Expand Down

0 comments on commit cf4ea5c

Please sign in to comment.