Skip to content

Commit

Permalink
Separate token price reporting schedule (#14154)
Browse files Browse the repository at this point in the history
* Separate token price reporting schedule (#1278)

## Motivation
Static price removal job rollout will be delayed to after 1.5 release.
To unblock db load concerns in 1.4.21 which writes prices to db, we want
to reduce number of token-price related insertions in db.

## Solution
Separate gas price and token price insertion frequency, insert every 10
minutes for token price. 10-min resolution for token price is accurate
enough for our use case.

* Changeset

---------

Co-authored-by: Chunkai Yang <matYang@users.noreply.github.com>
  • Loading branch information
mateusz-sekara and matYang authored Aug 20, 2024
1 parent c72afe7 commit a937d5c
Show file tree
Hide file tree
Showing 3 changed files with 363 additions and 211 deletions.
5 changes: 5 additions & 0 deletions .changeset/late-stingrays-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Separate price updates schedule for token prices in CCIP #updated
249 changes: 148 additions & 101 deletions core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,28 @@ type PriceService interface {
var _ PriceService = (*priceService)(nil)

const (
// Prices should expire after 10 minutes in DB. Prices should be fresh in the Commit plugin.
// 10 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while
// Gas prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time.
gasPriceUpdateInterval = 1 * time.Minute
// Token prices are refreshed every 10 minutes, we only report prices for blue chip tokens, DS&A simulation show
// their prices are stable, 10-minute resolution is accurate enough.
tokenPriceUpdateInterval = 10 * time.Minute

// Prices should expire after 25 minutes in DB. Prices should be fresh in the Commit plugin.
// 25 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while
// surfacing price update outages quickly enough.
priceExpireSec = 600
// Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price.
// 10 minutes should result in 40 rows being cleaned up per job, it is not a heavy load on DB, so there is no need
// to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireSec`.
priceCleanupInterval = 600 * time.Second
priceExpireThreshold = 25 * time.Minute

// Prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time.
priceUpdateInterval = 60 * time.Second
// Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price.
// 10 minutes should result in ~13 rows being cleaned up per job, it is not a heavy load on DB, so there is no need
// to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireThreshold`.
priceCleanupInterval = 10 * time.Minute
)

type priceService struct {
priceExpireSec int
cleanupInterval time.Duration
updateInterval time.Duration
priceExpireThreshold time.Duration
cleanupInterval time.Duration
gasUpdateInterval time.Duration
tokenUpdateInterval time.Duration

lggr logger.Logger
orm cciporm.ORM
Expand Down Expand Up @@ -93,9 +98,10 @@ func NewPriceService(
ctx, cancel := context.WithCancel(context.Background())

pw := &priceService{
priceExpireSec: priceExpireSec,
cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time
updateInterval: utils.WithJitter(priceUpdateInterval),
priceExpireThreshold: priceExpireThreshold,
cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time
gasUpdateInterval: utils.WithJitter(gasPriceUpdateInterval),
tokenUpdateInterval: utils.WithJitter(tokenPriceUpdateInterval),

lggr: lggr,
orm: orm,
Expand Down Expand Up @@ -135,10 +141,14 @@ func (p *priceService) Close() error {

func (p *priceService) run() {
cleanupTicker := time.NewTicker(p.cleanupInterval)
updateTicker := time.NewTicker(p.updateInterval)
gasUpdateTicker := time.NewTicker(p.gasUpdateInterval)
tokenUpdateTicker := time.NewTicker(p.tokenUpdateInterval)

go func() {
defer p.wg.Done()
defer cleanupTicker.Stop()
defer gasUpdateTicker.Stop()
defer tokenUpdateTicker.Stop()

for {
select {
Expand All @@ -149,10 +159,15 @@ func (p *priceService) run() {
if err != nil {
p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err)
}
case <-updateTicker.C:
err := p.runUpdate(p.backgroundCtx)
case <-gasUpdateTicker.C:
err := p.runGasPriceUpdate(p.backgroundCtx)
if err != nil {
p.lggr.Errorw("Error when updating prices in the background", "err", err)
p.lggr.Errorw("Error when updating gas prices in the background", "err", err)
}
case <-tokenUpdateTicker.C:
err := p.runTokenPriceUpdate(p.backgroundCtx)
if err != nil {
p.lggr.Errorw("Error when updating token prices in the background", "err", err)
}
}
}
Expand All @@ -167,8 +182,11 @@ func (p *priceService) UpdateDynamicConfig(ctx context.Context, gasPriceEstimato

// Config update may substantially change the prices, refresh the prices immediately, this also makes testing easier
// for not having to wait to the full update interval.
if err := p.runUpdate(ctx); err != nil {
p.lggr.Errorw("Error when updating prices after dynamic config update", "err", err)
if err := p.runGasPriceUpdate(ctx); err != nil {
p.lggr.Errorw("Error when updating gas prices after dynamic config update", "err", err)
}
if err := p.runTokenPriceUpdate(ctx); err != nil {
p.lggr.Errorw("Error when updating token prices after dynamic config update", "err", err)
}

return nil
Expand Down Expand Up @@ -224,15 +242,15 @@ func (p *priceService) runCleanup(ctx context.Context) error {
eg := new(errgroup.Group)

eg.Go(func() error {
err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec)
err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds()))
if err != nil {
return fmt.Errorf("error clearing gas prices: %w", err)
}
return nil
})

eg.Go(func() error {
err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec)
err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds()))
if err != nil {
return fmt.Errorf("error clearing token prices: %w", err)
}
Expand All @@ -242,153 +260,182 @@ func (p *priceService) runCleanup(ctx context.Context) error {
return eg.Wait()
}

func (p *priceService) runUpdate(ctx context.Context) error {
func (p *priceService) runGasPriceUpdate(ctx context.Context) error {
// Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader`
// Price updates happen infrequently - once every `priceUpdateInterval` seconds.
// Price updates happen infrequently - once every `gasPriceUpdateInterval` seconds.
// It does not happen on any code path that is performance sensitive.
// We can afford to have non-performant unlocks here that is simple and safe.
p.dynamicConfigMu.RLock()
defer p.dynamicConfigMu.RUnlock()

// There may be a period of time between service is started and dynamic config is updated
if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil {
p.lggr.Info("Skipping price update due to gasPriceEstimator and/or destPriceRegistry not ready")
if p.gasPriceEstimator == nil {
p.lggr.Info("Skipping gas price update due to gasPriceEstimator not ready")
return nil
}

sourceGasPriceUSD, err := p.observeGasPriceUpdates(ctx, p.lggr)
if err != nil {
return fmt.Errorf("failed to observe gas price updates: %w", err)
}

err = p.writeGasPricesToDB(ctx, sourceGasPriceUSD)
if err != nil {
return fmt.Errorf("failed to write gas prices to db: %w", err)
}

return nil
}

func (p *priceService) runTokenPriceUpdate(ctx context.Context) error {
// Protect against concurrent updates of `tokenPriceEstimator` and `destPriceRegistryReader`
// Price updates happen infrequently - once every `tokenPriceUpdateInterval` seconds.
p.dynamicConfigMu.RLock()
defer p.dynamicConfigMu.RUnlock()

// There may be a period of time between service is started and dynamic config is updated
if p.destPriceRegistryReader == nil {
p.lggr.Info("Skipping token price update due to destPriceRegistry not ready")
return nil
}

sourceGasPriceUSD, tokenPricesUSD, err := p.observePriceUpdates(ctx, p.lggr)
tokenPricesUSD, err := p.observeTokenPriceUpdates(ctx, p.lggr)
if err != nil {
return fmt.Errorf("failed to observe price updates: %w", err)
return fmt.Errorf("failed to observe token price updates: %w", err)
}

err = p.writePricesToDB(ctx, sourceGasPriceUSD, tokenPricesUSD)
err = p.writeTokenPricesToDB(ctx, tokenPricesUSD)
if err != nil {
return fmt.Errorf("failed to write prices to db: %w", err)
return fmt.Errorf("failed to write token prices to db: %w", err)
}

return nil
}

func (p *priceService) observePriceUpdates(
func (p *priceService) observeGasPriceUpdates(
ctx context.Context,
lggr logger.Logger,
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil {
return nil, nil, fmt.Errorf("gasPriceEstimator and/or destPriceRegistry is not set yet")
) (sourceGasPriceUSD *big.Int, err error) {
if p.gasPriceEstimator == nil {
return nil, fmt.Errorf("gasPriceEstimator is not set yet")
}

sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter)
// Include wrapped native to identify the source native USD price, notice USD is in 1e18 scale, i.e. $1 = 1e18
rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, []cciptypes.Address{p.sourceNative})
if err != nil {
return nil, fmt.Errorf("failed to fetch source native price (%s): %w", p.sourceNative, err)
}

lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens)
sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative]
if !exists {
return nil, fmt.Errorf("missing source native (%s) price", p.sourceNative)
}

sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx)
if err != nil {
return nil, err
}
if sourceGasPrice == nil {
return nil, fmt.Errorf("missing gas price")
}
sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD)
if err != nil {
return nil, nil, fmt.Errorf("get destination tokens: %w", err)
return nil, err
}

return p.generatePriceUpdates(ctx, lggr, sortedLaneTokens)
lggr.Infow("PriceService observed latest gas price",
"sourceChainSelector", p.sourceChainSelector,
"destChainSelector", p.destChainSelector,
"sourceNative", p.sourceNative,
"gasPriceWei", sourceGasPrice,
"sourceNativePriceUSD", sourceNativePriceUSD,
"sourceGasPriceUSD", sourceGasPriceUSD,
)
return sourceGasPriceUSD, nil
}

// All prices are USD ($1=1e18) denominated. All prices must be not nil.
// Return token prices should contain the exact same tokens as in tokenDecimals.
func (p *priceService) generatePriceUpdates(
func (p *priceService) observeTokenPriceUpdates(
ctx context.Context,
lggr logger.Logger,
sortedLaneTokens []cciptypes.Address,
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
// Include wrapped native in our token query as way to identify the source native USD price.
// notice USD is in 1e18 scale, i.e. $1 = 1e18
queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{p.sourceNative}, sortedLaneTokens)
) (tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
if p.destPriceRegistryReader == nil {
return nil, fmt.Errorf("destPriceRegistry is not set yet")
}

sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter)
if err != nil {
return nil, fmt.Errorf("get destination tokens: %w", err)
}

lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens)

queryTokens := ccipcommon.FlattenUniqueSlice(sortedLaneTokens)
rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, queryTokens)
if err != nil {
return nil, nil, err
return nil, fmt.Errorf("failed to fetch token prices (%v): %w", queryTokens, err)
}
lggr.Infow("Raw token prices", "rawTokenPrices", rawTokenPricesUSD)

// make sure that we got prices for all the tokens of our query
for _, token := range queryTokens {
if rawTokenPricesUSD[token] == nil {
return nil, nil, fmt.Errorf("missing token price: %+v", token)
return nil, fmt.Errorf("missing token price: %+v", token)
}
}

sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative]
if !exists {
return nil, nil, fmt.Errorf("missing source native (%s) price", p.sourceNative)
}

destTokensDecimals, err := p.destPriceRegistryReader.GetTokensDecimals(ctx, sortedLaneTokens)
if err != nil {
return nil, nil, fmt.Errorf("get tokens decimals: %w", err)
return nil, fmt.Errorf("get tokens decimals: %w", err)
}

tokenPricesUSD = make(map[cciptypes.Address]*big.Int, len(rawTokenPricesUSD))
for i, token := range sortedLaneTokens {
tokenPricesUSD[token] = calculateUsdPer1e18TokenAmount(rawTokenPricesUSD[token], destTokensDecimals[i])
}

sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx)
if err != nil {
return nil, nil, err
}
if sourceGasPrice == nil {
return nil, nil, fmt.Errorf("missing gas price")
}
sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD)
if err != nil {
return nil, nil, err
}

lggr.Infow("PriceService observed latest price",
lggr.Infow("PriceService observed latest token prices",
"sourceChainSelector", p.sourceChainSelector,
"destChainSelector", p.destChainSelector,
"gasPriceWei", sourceGasPrice,
"sourceNativePriceUSD", sourceNativePriceUSD,
"sourceGasPriceUSD", sourceGasPriceUSD,
"tokenPricesUSD", tokenPricesUSD,
)
return sourceGasPriceUSD, tokenPricesUSD, nil
return tokenPricesUSD, nil
}

func (p *priceService) writePricesToDB(
ctx context.Context,
sourceGasPriceUSD *big.Int,
tokenPricesUSD map[cciptypes.Address]*big.Int,
) (err error) {
eg := new(errgroup.Group)

if sourceGasPriceUSD != nil {
eg.Go(func() error {
return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{
{
SourceChainSelector: p.sourceChainSelector,
GasPrice: assets.NewWei(sourceGasPriceUSD),
},
})
})
func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) (err error) {
if sourceGasPriceUSD == nil {
return nil
}

if tokenPricesUSD != nil {
var tokenPrices []cciporm.TokenPriceUpdate
return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{
{
SourceChainSelector: p.sourceChainSelector,
GasPrice: assets.NewWei(sourceGasPriceUSD),
},
})
}

for token, price := range tokenPricesUSD {
tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{
TokenAddr: string(token),
TokenPrice: assets.NewWei(price),
})
}
func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) (err error) {
if tokenPricesUSD == nil {
return nil
}

// Sort token by addr to make price updates ordering deterministic, easier to testing and debugging
sort.Slice(tokenPrices, func(i, j int) bool {
return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr
})
var tokenPrices []cciporm.TokenPriceUpdate

eg.Go(func() error {
return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices)
for token, price := range tokenPricesUSD {
tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{
TokenAddr: string(token),
TokenPrice: assets.NewWei(price),
})
}

return eg.Wait()
// Sort token by addr to make price updates ordering deterministic, easier for testing and debugging
sort.Slice(tokenPrices, func(i, j int) bool {
return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr
})

return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices)
}

// Input price is USD per full token, with 18 decimal precision
Expand Down
Loading

0 comments on commit a937d5c

Please sign in to comment.