diff --git a/.changeset/late-stingrays-promise.md b/.changeset/late-stingrays-promise.md new file mode 100644 index 00000000000..39ca570f581 --- /dev/null +++ b/.changeset/late-stingrays-promise.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Separate price updates schedule for token prices in CCIP #updated diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go index 7d7d5bda3ad..2118d5832da 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go @@ -42,23 +42,28 @@ type PriceService interface { var _ PriceService = (*priceService)(nil) const ( - // Prices should expire after 10 minutes in DB. Prices should be fresh in the Commit plugin. - // 10 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while + // Gas prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. + gasPriceUpdateInterval = 1 * time.Minute + // Token prices are refreshed every 10 minutes, we only report prices for blue chip tokens, DS&A simulation show + // their prices are stable, 10-minute resolution is accurate enough. + tokenPriceUpdateInterval = 10 * time.Minute + + // Prices should expire after 25 minutes in DB. Prices should be fresh in the Commit plugin. + // 25 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while // surfacing price update outages quickly enough. - priceExpireSec = 600 - // Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. - // 10 minutes should result in 40 rows being cleaned up per job, it is not a heavy load on DB, so there is no need - // to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireSec`. - priceCleanupInterval = 600 * time.Second + priceExpireThreshold = 25 * time.Minute - // Prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. - priceUpdateInterval = 60 * time.Second + // Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. + // 10 minutes should result in ~13 rows being cleaned up per job, it is not a heavy load on DB, so there is no need + // to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireThreshold`. + priceCleanupInterval = 10 * time.Minute ) type priceService struct { - priceExpireSec int - cleanupInterval time.Duration - updateInterval time.Duration + priceExpireThreshold time.Duration + cleanupInterval time.Duration + gasUpdateInterval time.Duration + tokenUpdateInterval time.Duration lggr logger.Logger orm cciporm.ORM @@ -93,9 +98,10 @@ func NewPriceService( ctx, cancel := context.WithCancel(context.Background()) pw := &priceService{ - priceExpireSec: priceExpireSec, - cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time - updateInterval: utils.WithJitter(priceUpdateInterval), + priceExpireThreshold: priceExpireThreshold, + cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time + gasUpdateInterval: utils.WithJitter(gasPriceUpdateInterval), + tokenUpdateInterval: utils.WithJitter(tokenPriceUpdateInterval), lggr: lggr, orm: orm, @@ -135,10 +141,14 @@ func (p *priceService) Close() error { func (p *priceService) run() { cleanupTicker := time.NewTicker(p.cleanupInterval) - updateTicker := time.NewTicker(p.updateInterval) + gasUpdateTicker := time.NewTicker(p.gasUpdateInterval) + tokenUpdateTicker := time.NewTicker(p.tokenUpdateInterval) go func() { defer p.wg.Done() + defer cleanupTicker.Stop() + defer gasUpdateTicker.Stop() + defer tokenUpdateTicker.Stop() for { select { @@ -149,10 +159,15 @@ func (p *priceService) run() { if err != nil { p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err) } - case <-updateTicker.C: - err := p.runUpdate(p.backgroundCtx) + case <-gasUpdateTicker.C: + err := p.runGasPriceUpdate(p.backgroundCtx) if err != nil { - p.lggr.Errorw("Error when updating prices in the background", "err", err) + p.lggr.Errorw("Error when updating gas prices in the background", "err", err) + } + case <-tokenUpdateTicker.C: + err := p.runTokenPriceUpdate(p.backgroundCtx) + if err != nil { + p.lggr.Errorw("Error when updating token prices in the background", "err", err) } } } @@ -167,8 +182,11 @@ func (p *priceService) UpdateDynamicConfig(ctx context.Context, gasPriceEstimato // Config update may substantially change the prices, refresh the prices immediately, this also makes testing easier // for not having to wait to the full update interval. - if err := p.runUpdate(ctx); err != nil { - p.lggr.Errorw("Error when updating prices after dynamic config update", "err", err) + if err := p.runGasPriceUpdate(ctx); err != nil { + p.lggr.Errorw("Error when updating gas prices after dynamic config update", "err", err) + } + if err := p.runTokenPriceUpdate(ctx); err != nil { + p.lggr.Errorw("Error when updating token prices after dynamic config update", "err", err) } return nil @@ -224,7 +242,7 @@ func (p *priceService) runCleanup(ctx context.Context) error { eg := new(errgroup.Group) eg.Go(func() error { - err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec) + err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) if err != nil { return fmt.Errorf("error clearing gas prices: %w", err) } @@ -232,7 +250,7 @@ func (p *priceService) runCleanup(ctx context.Context) error { }) eg.Go(func() error { - err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec) + err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) if err != nil { return fmt.Errorf("error clearing token prices: %w", err) } @@ -242,84 +260,134 @@ func (p *priceService) runCleanup(ctx context.Context) error { return eg.Wait() } -func (p *priceService) runUpdate(ctx context.Context) error { +func (p *priceService) runGasPriceUpdate(ctx context.Context) error { // Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader` - // Price updates happen infrequently - once every `priceUpdateInterval` seconds. + // Price updates happen infrequently - once every `gasPriceUpdateInterval` seconds. // It does not happen on any code path that is performance sensitive. // We can afford to have non-performant unlocks here that is simple and safe. p.dynamicConfigMu.RLock() defer p.dynamicConfigMu.RUnlock() // There may be a period of time between service is started and dynamic config is updated - if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil { - p.lggr.Info("Skipping price update due to gasPriceEstimator and/or destPriceRegistry not ready") + if p.gasPriceEstimator == nil { + p.lggr.Info("Skipping gas price update due to gasPriceEstimator not ready") + return nil + } + + sourceGasPriceUSD, err := p.observeGasPriceUpdates(ctx, p.lggr) + if err != nil { + return fmt.Errorf("failed to observe gas price updates: %w", err) + } + + err = p.writeGasPricesToDB(ctx, sourceGasPriceUSD) + if err != nil { + return fmt.Errorf("failed to write gas prices to db: %w", err) + } + + return nil +} + +func (p *priceService) runTokenPriceUpdate(ctx context.Context) error { + // Protect against concurrent updates of `tokenPriceEstimator` and `destPriceRegistryReader` + // Price updates happen infrequently - once every `tokenPriceUpdateInterval` seconds. + p.dynamicConfigMu.RLock() + defer p.dynamicConfigMu.RUnlock() + + // There may be a period of time between service is started and dynamic config is updated + if p.destPriceRegistryReader == nil { + p.lggr.Info("Skipping token price update due to destPriceRegistry not ready") return nil } - sourceGasPriceUSD, tokenPricesUSD, err := p.observePriceUpdates(ctx, p.lggr) + tokenPricesUSD, err := p.observeTokenPriceUpdates(ctx, p.lggr) if err != nil { - return fmt.Errorf("failed to observe price updates: %w", err) + return fmt.Errorf("failed to observe token price updates: %w", err) } - err = p.writePricesToDB(ctx, sourceGasPriceUSD, tokenPricesUSD) + err = p.writeTokenPricesToDB(ctx, tokenPricesUSD) if err != nil { - return fmt.Errorf("failed to write prices to db: %w", err) + return fmt.Errorf("failed to write token prices to db: %w", err) } return nil } -func (p *priceService) observePriceUpdates( +func (p *priceService) observeGasPriceUpdates( ctx context.Context, lggr logger.Logger, -) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { - if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil { - return nil, nil, fmt.Errorf("gasPriceEstimator and/or destPriceRegistry is not set yet") +) (sourceGasPriceUSD *big.Int, err error) { + if p.gasPriceEstimator == nil { + return nil, fmt.Errorf("gasPriceEstimator is not set yet") } - sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter) + // Include wrapped native to identify the source native USD price, notice USD is in 1e18 scale, i.e. $1 = 1e18 + rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, []cciptypes.Address{p.sourceNative}) + if err != nil { + return nil, fmt.Errorf("failed to fetch source native price (%s): %w", p.sourceNative, err) + } - lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens) + sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative] + if !exists { + return nil, fmt.Errorf("missing source native (%s) price", p.sourceNative) + } + sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx) + if err != nil { + return nil, err + } + if sourceGasPrice == nil { + return nil, fmt.Errorf("missing gas price") + } + sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD) if err != nil { - return nil, nil, fmt.Errorf("get destination tokens: %w", err) + return nil, err } - return p.generatePriceUpdates(ctx, lggr, sortedLaneTokens) + lggr.Infow("PriceService observed latest gas price", + "sourceChainSelector", p.sourceChainSelector, + "destChainSelector", p.destChainSelector, + "sourceNative", p.sourceNative, + "gasPriceWei", sourceGasPrice, + "sourceNativePriceUSD", sourceNativePriceUSD, + "sourceGasPriceUSD", sourceGasPriceUSD, + ) + return sourceGasPriceUSD, nil } // All prices are USD ($1=1e18) denominated. All prices must be not nil. // Return token prices should contain the exact same tokens as in tokenDecimals. -func (p *priceService) generatePriceUpdates( +func (p *priceService) observeTokenPriceUpdates( ctx context.Context, lggr logger.Logger, - sortedLaneTokens []cciptypes.Address, -) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { - // Include wrapped native in our token query as way to identify the source native USD price. - // notice USD is in 1e18 scale, i.e. $1 = 1e18 - queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{p.sourceNative}, sortedLaneTokens) +) (tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { + if p.destPriceRegistryReader == nil { + return nil, fmt.Errorf("destPriceRegistry is not set yet") + } + + sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter) + if err != nil { + return nil, fmt.Errorf("get destination tokens: %w", err) + } + lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens) + + queryTokens := ccipcommon.FlattenUniqueSlice(sortedLaneTokens) rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, queryTokens) if err != nil { - return nil, nil, err + return nil, fmt.Errorf("failed to fetch token prices (%v): %w", queryTokens, err) } lggr.Infow("Raw token prices", "rawTokenPrices", rawTokenPricesUSD) // make sure that we got prices for all the tokens of our query for _, token := range queryTokens { if rawTokenPricesUSD[token] == nil { - return nil, nil, fmt.Errorf("missing token price: %+v", token) + return nil, fmt.Errorf("missing token price: %+v", token) } } - sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative] - if !exists { - return nil, nil, fmt.Errorf("missing source native (%s) price", p.sourceNative) - } - destTokensDecimals, err := p.destPriceRegistryReader.GetTokensDecimals(ctx, sortedLaneTokens) if err != nil { - return nil, nil, fmt.Errorf("get tokens decimals: %w", err) + return nil, fmt.Errorf("get tokens decimals: %w", err) } tokenPricesUSD = make(map[cciptypes.Address]*big.Int, len(rawTokenPricesUSD)) @@ -327,68 +395,47 @@ func (p *priceService) generatePriceUpdates( tokenPricesUSD[token] = calculateUsdPer1e18TokenAmount(rawTokenPricesUSD[token], destTokensDecimals[i]) } - sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx) - if err != nil { - return nil, nil, err - } - if sourceGasPrice == nil { - return nil, nil, fmt.Errorf("missing gas price") - } - sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD) - if err != nil { - return nil, nil, err - } - - lggr.Infow("PriceService observed latest price", + lggr.Infow("PriceService observed latest token prices", "sourceChainSelector", p.sourceChainSelector, "destChainSelector", p.destChainSelector, - "gasPriceWei", sourceGasPrice, - "sourceNativePriceUSD", sourceNativePriceUSD, - "sourceGasPriceUSD", sourceGasPriceUSD, "tokenPricesUSD", tokenPricesUSD, ) - return sourceGasPriceUSD, tokenPricesUSD, nil + return tokenPricesUSD, nil } -func (p *priceService) writePricesToDB( - ctx context.Context, - sourceGasPriceUSD *big.Int, - tokenPricesUSD map[cciptypes.Address]*big.Int, -) (err error) { - eg := new(errgroup.Group) - - if sourceGasPriceUSD != nil { - eg.Go(func() error { - return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ - { - SourceChainSelector: p.sourceChainSelector, - GasPrice: assets.NewWei(sourceGasPriceUSD), - }, - }) - }) +func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) (err error) { + if sourceGasPriceUSD == nil { + return nil } - if tokenPricesUSD != nil { - var tokenPrices []cciporm.TokenPriceUpdate + return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ + { + SourceChainSelector: p.sourceChainSelector, + GasPrice: assets.NewWei(sourceGasPriceUSD), + }, + }) +} - for token, price := range tokenPricesUSD { - tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ - TokenAddr: string(token), - TokenPrice: assets.NewWei(price), - }) - } +func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) (err error) { + if tokenPricesUSD == nil { + return nil + } - // Sort token by addr to make price updates ordering deterministic, easier to testing and debugging - sort.Slice(tokenPrices, func(i, j int) bool { - return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr - }) + var tokenPrices []cciporm.TokenPriceUpdate - eg.Go(func() error { - return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) + for token, price := range tokenPricesUSD { + tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ + TokenAddr: string(token), + TokenPrice: assets.NewWei(price), }) } - return eg.Wait() + // Sort token by addr to make price updates ordering deterministic, easier for testing and debugging + sort.Slice(tokenPrices, func(i, j int) bool { + return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr + }) + + return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) } // Input price is USD per full token, with 18 decimal precision diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go index 0bea8af9a19..26721bdf8e4 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go @@ -82,8 +82,8 @@ func TestPriceService_priceCleanup(t *testing.T) { } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("ClearGasPricesByDestChain", ctx, destChainSelector, priceExpireSec).Return(gasPricesError).Once() - mockOrm.On("ClearTokenPricesByDestChain", ctx, destChainSelector, priceExpireSec).Return(tokenPricesError).Once() + mockOrm.On("ClearGasPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(gasPricesError).Once() + mockOrm.On("ClearTokenPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(tokenPricesError).Once() priceService := NewPriceService( lggr, @@ -105,17 +105,13 @@ func TestPriceService_priceCleanup(t *testing.T) { } } -func TestPriceService_priceWrite(t *testing.T) { +func TestPriceService_writeGasPrices(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) sourceChainSelector := uint64(67890) gasPrice := big.NewInt(1e18) - tokenPrices := map[cciptypes.Address]*big.Int{ - "0x123": big.NewInt(2e18), - "0x234": big.NewInt(3e18), - } expectedGasPriceUpdate := []cciporm.GasPriceUpdate{ { @@ -123,6 +119,67 @@ func TestPriceService_priceWrite(t *testing.T) { GasPrice: assets.NewWei(gasPrice), }, } + + testCases := []struct { + name string + gasPriceError bool + expectedErr bool + }{ + { + name: "ORM called successfully", + gasPriceError: false, + expectedErr: false, + }, + { + name: "gasPrice clear failed", + gasPriceError: true, + expectedErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := tests.Context(t) + + var gasPricesError error + if tc.gasPriceError { + gasPricesError = fmt.Errorf("gas prices error") + } + + mockOrm := ccipmocks.NewORM(t) + mockOrm.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, expectedGasPriceUpdate).Return(gasPricesError).Once() + + priceService := NewPriceService( + lggr, + mockOrm, + jobId, + destChainSelector, + sourceChainSelector, + "", + nil, + nil, + ).(*priceService) + err := priceService.writeGasPricesToDB(ctx, gasPrice) + if tc.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestPriceService_writeTokenPrices(t *testing.T) { + lggr := logger.TestLogger(t) + jobId := int32(1) + destChainSelector := uint64(12345) + sourceChainSelector := uint64(67890) + + tokenPrices := map[cciptypes.Address]*big.Int{ + "0x123": big.NewInt(2e18), + "0x234": big.NewInt(3e18), + } + expectedTokenPriceUpdate := []cciporm.TokenPriceUpdate{ { TokenAddr: "0x123", @@ -136,31 +193,16 @@ func TestPriceService_priceWrite(t *testing.T) { testCases := []struct { name string - gasPriceError bool tokenPriceError bool expectedErr bool }{ { name: "ORM called successfully", - gasPriceError: false, tokenPriceError: false, expectedErr: false, }, - { - name: "gasPrice clear failed", - gasPriceError: true, - tokenPriceError: false, - expectedErr: true, - }, { name: "tokenPrice clear failed", - gasPriceError: false, - tokenPriceError: true, - expectedErr: true, - }, - { - name: "both ORM calls failed", - gasPriceError: true, tokenPriceError: true, expectedErr: true, }, @@ -170,17 +212,12 @@ func TestPriceService_priceWrite(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx := tests.Context(t) - var gasPricesError error var tokenPricesError error - if tc.gasPriceError { - gasPricesError = fmt.Errorf("gas prices error") - } if tc.tokenPriceError { tokenPricesError = fmt.Errorf("token prices error") } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, expectedGasPriceUpdate).Return(gasPricesError).Once() mockOrm.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, expectedTokenPriceUpdate).Return(tokenPricesError).Once() priceService := NewPriceService( @@ -193,7 +230,7 @@ func TestPriceService_priceWrite(t *testing.T) { nil, nil, ).(*priceService) - err := priceService.writePricesToDB(ctx, gasPrice, tokenPrices) + err := priceService.writeTokenPricesToDB(ctx, tokenPrices) if tc.expectedErr { assert.Error(t, err) } else { @@ -203,22 +240,15 @@ func TestPriceService_priceWrite(t *testing.T) { } } -func TestPriceService_generatePriceUpdates(t *testing.T) { +func TestPriceService_observeGasPriceUpdates(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) sourceChainSelector := uint64(67890) - - const nTokens = 10 - tokens := make([]cciptypes.Address, nTokens) - for i := range tokens { - tokens[i] = cciptypes.Address(utils.RandomAddress().String()) - } - sort.Slice(tokens, func(i, j int) bool { return tokens[i] < tokens[j] }) + sourceNativeToken := cciptypes.Address(utils.RandomAddress().String()) testCases := []struct { name string - tokenDecimals map[cciptypes.Address]uint8 sourceNativeToken cciptypes.Address priceGetterRespData map[cciptypes.Address]*big.Int priceGetterRespErr error @@ -226,108 +256,179 @@ func TestPriceService_generatePriceUpdates(t *testing.T) { feeEstimatorRespErr error maxGasPrice uint64 expSourceGasPriceUSD *big.Int - expTokenPricesUSD map[cciptypes.Address]*big.Int expErr bool }{ { - name: "base", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 12, - }, - sourceNativeToken: tokens[0], + name: "base", + sourceNativeToken: sourceNativeToken, priceGetterRespData: map[cciptypes.Address]*big.Int{ - tokens[0]: val1e18(100), - tokens[1]: val1e18(200), - tokens[2]: val1e18(300), // price getter returned a price for this token even though we didn't request it (should be skipped) + sourceNativeToken: val1e18(100), }, priceGetterRespErr: nil, feeEstimatorRespFee: big.NewInt(10), feeEstimatorRespErr: nil, maxGasPrice: 1e18, expSourceGasPriceUSD: big.NewInt(1000), - expTokenPricesUSD: map[cciptypes.Address]*big.Int{ - tokens[0]: val1e18(100), - tokens[1]: val1e18(200 * 1e6), - }, - expErr: false, + expErr: false, }, { - name: "price getter returned an error", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 18, - }, - sourceNativeToken: tokens[0], + name: "price getter returned an error", + sourceNativeToken: sourceNativeToken, priceGetterRespData: nil, priceGetterRespErr: fmt.Errorf("some random network error"), expErr: true, }, { - name: "price getter skipped a requested price", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 18, - }, - sourceNativeToken: tokens[0], + name: "price getter did not return source native gas price", + sourceNativeToken: sourceNativeToken, priceGetterRespData: map[cciptypes.Address]*big.Int{ - tokens[0]: val1e18(100), + "0x1": val1e18(100), }, priceGetterRespErr: nil, expErr: true, }, { - name: "price getter skipped source native price", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 18, + name: "dynamic fee cap overrides legacy", + sourceNativeToken: sourceNativeToken, + priceGetterRespData: map[cciptypes.Address]*big.Int{ + sourceNativeToken: val1e18(100), }, - sourceNativeToken: tokens[2], + priceGetterRespErr: nil, + feeEstimatorRespFee: big.NewInt(20), + feeEstimatorRespErr: nil, + maxGasPrice: 1e18, + expSourceGasPriceUSD: big.NewInt(2000), + expErr: false, + }, + { + name: "nil gas price", + sourceNativeToken: sourceNativeToken, priceGetterRespData: map[cciptypes.Address]*big.Int{ - tokens[0]: val1e18(100), - tokens[1]: val1e18(200), + sourceNativeToken: val1e18(100), }, - priceGetterRespErr: nil, - expErr: true, + feeEstimatorRespFee: nil, + maxGasPrice: 1e18, + expErr: true, }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + priceGetter := pricegetter.NewMockPriceGetter(t) + defer priceGetter.AssertExpectations(t) + + gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) + defer gasPriceEstimator.AssertExpectations(t) + + priceGetter.On("TokenPricesUSD", mock.Anything, []cciptypes.Address{tc.sourceNativeToken}).Return(tc.priceGetterRespData, tc.priceGetterRespErr) + + if tc.maxGasPrice > 0 { + gasPriceEstimator.On("GetGasPrice", mock.Anything).Return(tc.feeEstimatorRespFee, tc.feeEstimatorRespErr) + if tc.feeEstimatorRespFee != nil { + pUSD := ccipcalc.CalculateUsdPerUnitGas(tc.feeEstimatorRespFee, tc.priceGetterRespData[tc.sourceNativeToken]) + gasPriceEstimator.On("DenoteInUSD", mock.Anything, mock.Anything).Return(pUSD, nil) + } + } + + priceService := NewPriceService( + lggr, + nil, + jobId, + destChainSelector, + sourceChainSelector, + tc.sourceNativeToken, + priceGetter, + nil, + ).(*priceService) + priceService.gasPriceEstimator = gasPriceEstimator + + sourceGasPriceUSD, err := priceService.observeGasPriceUpdates(context.Background(), lggr) + if tc.expErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.True(t, tc.expSourceGasPriceUSD.Cmp(sourceGasPriceUSD) == 0) + }) + } +} + +func TestPriceService_observeTokenPriceUpdates(t *testing.T) { + lggr := logger.TestLogger(t) + jobId := int32(1) + destChainSelector := uint64(12345) + sourceChainSelector := uint64(67890) + + const nTokens = 10 + tokens := make([]cciptypes.Address, nTokens) + for i := range tokens { + tokens[i] = cciptypes.Address(utils.RandomAddress().String()) + } + sort.Slice(tokens, func(i, j int) bool { return tokens[i] < tokens[j] }) + + testCases := []struct { + name string + tokenDecimals map[cciptypes.Address]uint8 + filterOutTokens []cciptypes.Address + priceGetterRespData map[cciptypes.Address]*big.Int + priceGetterRespErr error + expTokenPricesUSD map[cciptypes.Address]*big.Int + expErr bool + }{ { - name: "dynamic fee cap overrides legacy", + name: "base", tokenDecimals: map[cciptypes.Address]uint8{ tokens[0]: 18, - tokens[1]: 18, + tokens[1]: 12, }, - sourceNativeToken: tokens[0], + filterOutTokens: []cciptypes.Address{tokens[2]}, priceGetterRespData: map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(100), tokens[1]: val1e18(200), tokens[2]: val1e18(300), // price getter returned a price for this token even though we didn't request it (should be skipped) }, - priceGetterRespErr: nil, - feeEstimatorRespFee: big.NewInt(20), - feeEstimatorRespErr: nil, - maxGasPrice: 1e18, - expSourceGasPriceUSD: big.NewInt(2000), + priceGetterRespErr: nil, expTokenPricesUSD: map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(100), - tokens[1]: val1e18(200), + tokens[1]: val1e18(200 * 1e6), }, expErr: false, }, { - name: "nil gas price", + name: "price getter returned an error", + tokenDecimals: map[cciptypes.Address]uint8{ + tokens[0]: 18, + tokens[1]: 18, + }, + priceGetterRespData: nil, + priceGetterRespErr: fmt.Errorf("some random network error"), + expErr: true, + }, + { + name: "price getter skipped a requested price", tokenDecimals: map[cciptypes.Address]uint8{ tokens[0]: 18, tokens[1]: 18, }, - sourceNativeToken: tokens[0], priceGetterRespData: map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(100), + }, + priceGetterRespErr: nil, + expErr: true, + }, + { + name: "nil token price", + tokenDecimals: map[cciptypes.Address]uint8{ + tokens[0]: 18, + tokens[1]: 18, + }, + filterOutTokens: []cciptypes.Address{tokens[2]}, + priceGetterRespData: map[cciptypes.Address]*big.Int{ + tokens[0]: nil, tokens[1]: val1e18(200), - tokens[2]: val1e18(300), // price getter returned a price for this token even though we didn't request it (should be skipped) + tokens[2]: val1e18(300), }, - feeEstimatorRespFee: nil, - maxGasPrice: 1e18, - expErr: true, + expErr: true, }, } @@ -336,9 +437,6 @@ func TestPriceService_generatePriceUpdates(t *testing.T) { priceGetter := pricegetter.NewMockPriceGetter(t) defer priceGetter.AssertExpectations(t) - gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) - defer gasPriceEstimator.AssertExpectations(t) - var destTokens []cciptypes.Address for tk := range tc.tokenDecimals { destTokens = append(destTokens, tk) @@ -351,22 +449,21 @@ func TestPriceService_generatePriceUpdates(t *testing.T) { destDecimals = append(destDecimals, tc.tokenDecimals[token]) } - queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{tc.sourceNativeToken}, destTokens) + queryTokens := ccipcommon.FlattenUniqueSlice(destTokens) if len(queryTokens) > 0 { priceGetter.On("TokenPricesUSD", mock.Anything, queryTokens).Return(tc.priceGetterRespData, tc.priceGetterRespErr) + priceGetter.On("FilterConfiguredTokens", mock.Anything, mock.Anything).Return(destTokens, tc.filterOutTokens, nil) } - if tc.maxGasPrice > 0 { - gasPriceEstimator.On("GetGasPrice", mock.Anything).Return(tc.feeEstimatorRespFee, tc.feeEstimatorRespErr) - if tc.feeEstimatorRespFee != nil { - pUSD := ccipcalc.CalculateUsdPerUnitGas(tc.feeEstimatorRespFee, tc.expTokenPricesUSD[tc.sourceNativeToken]) - gasPriceEstimator.On("DenoteInUSD", mock.Anything, mock.Anything).Return(pUSD, nil) - } - } + offRampReader := ccipdatamocks.NewOffRampReader(t) + offRampReader.On("GetTokens", mock.Anything).Return(cciptypes.OffRampTokens{ + DestinationTokens: destTokens, + }, nil).Maybe() destPriceReg := ccipdatamocks.NewPriceRegistryReader(t) destPriceReg.On("GetTokensDecimals", mock.Anything, destTokens).Return(destDecimals, nil).Maybe() + destPriceReg.On("GetFeeTokens", mock.Anything).Return([]cciptypes.Address{destTokens[0]}, nil).Maybe() priceService := NewPriceService( lggr, @@ -374,20 +471,18 @@ func TestPriceService_generatePriceUpdates(t *testing.T) { jobId, destChainSelector, sourceChainSelector, - tc.sourceNativeToken, + "0x123", priceGetter, - nil, + offRampReader, ).(*priceService) - priceService.gasPriceEstimator = gasPriceEstimator priceService.destPriceRegistryReader = destPriceReg - sourceGasPriceUSD, tokenPricesUSD, err := priceService.generatePriceUpdates(context.Background(), lggr, destTokens) + tokenPricesUSD, err := priceService.observeTokenPriceUpdates(context.Background(), lggr) if tc.expErr { assert.Error(t, err) return } assert.NoError(t, err) - assert.True(t, tc.expSourceGasPriceUSD.Cmp(sourceGasPriceUSD) == 0) assert.True(t, reflect.DeepEqual(tc.expTokenPricesUSD, tokenPricesUSD)) }) } @@ -680,8 +775,10 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) defer gasPriceEstimator.AssertExpectations(t) - priceGetter.On("TokenPricesUSD", mock.Anything, tokens).Return(map[cciptypes.Address]*big.Int{ + priceGetter.On("TokenPricesUSD", mock.Anything, tokens[:1]).Return(map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(tokenPrices[0]), + }, nil) + priceGetter.On("TokenPricesUSD", mock.Anything, tokens[1:]).Return(map[cciptypes.Address]*big.Int{ tokens[1]: val1e18(tokenPrices[1]), tokens[2]: val1e18(tokenPrices[2]), }, nil) @@ -711,15 +808,18 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { offRampReader, ).(*priceService) - updateInterval := 2000 * time.Millisecond + gasUpdateInterval := 2000 * time.Millisecond + tokenUpdateInterval := 5000 * time.Millisecond cleanupInterval := 3000 * time.Millisecond - // run write task every 2 second - priceService.updateInterval = updateInterval + // run gas price task every 2 second + priceService.gasUpdateInterval = gasUpdateInterval + // run token price task every 5 second + priceService.tokenUpdateInterval = tokenUpdateInterval // run cleanup every 3 seconds priceService.cleanupInterval = cleanupInterval // expire all prices during every cleanup - priceService.priceExpireSec = 0 + priceService.priceExpireThreshold = time.Duration(0) // initially, db is empty assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0))