From fb645ca2217e75bbfbe87565a49d65c8212bc3ae Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 8 Aug 2024 14:06:20 +0200 Subject: [PATCH] Added detailed tests for observability layer Fixing tests and adding missing changeset --- .changeset/thick-mails-applaud.md | 5 + core/services/ccip/mocks/orm.go | 99 +++++++----- core/services/ccip/observability.go | 115 ++++++++++++++ core/services/ccip/observability_test.go | 94 +++++++++++ core/services/ccip/orm.go | 148 +++++++++++++----- core/services/ccip/orm_test.go | 148 ++++++++++++------ .../plugins/ccip/ccipcommit/initializers.go | 2 +- .../ccip/internal/ccipdb/price_service.go | 23 ++- .../internal/ccipdb/price_service_test.go | 35 +---- .../migrations/0249_ccip_token_prices_fix.sql | 4 +- 10 files changed, 501 insertions(+), 172 deletions(-) create mode 100644 .changeset/thick-mails-applaud.md create mode 100644 core/services/ccip/observability.go create mode 100644 core/services/ccip/observability_test.go diff --git a/.changeset/thick-mails-applaud.md b/.changeset/thick-mails-applaud.md new file mode 100644 index 00000000000..ceec9e64fd4 --- /dev/null +++ b/.changeset/thick-mails-applaud.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Simplify how token and gas prices are stored in the database - user upsert instead of insert/delete flow #db_update diff --git a/core/services/ccip/mocks/orm.go b/core/services/ccip/mocks/orm.go index 35ca2353bfc..0c9086def7e 100644 --- a/core/services/ccip/mocks/orm.go +++ b/core/services/ccip/mocks/orm.go @@ -8,6 +8,8 @@ import ( ccip "github.com/smartcontractkit/chainlink/v2/core/services/ccip" mock "github.com/stretchr/testify/mock" + + time "time" ) // ORM is an autogenerated mock type for the ORM type @@ -141,98 +143,119 @@ func (_c *ORM_GetTokenPricesByDestChain_Call) RunAndReturn(run func(context.Cont return _c } -// InsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, gasPrices -func (_m *ORM) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPriceUpdate) error { +// UpsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, gasPrices +func (_m *ORM) UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPrice) (int64, error) { ret := _m.Called(ctx, destChainSelector, gasPrices) if len(ret) == 0 { - panic("no return value specified for InsertGasPricesForDestChain") + panic("no return value specified for UpsertGasPricesForDestChain") } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPriceUpdate) error); ok { + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPrice) (int64, error)); ok { + return rf(ctx, destChainSelector, gasPrices) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPrice) int64); ok { r0 = rf(ctx, destChainSelector, gasPrices) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(int64) } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, uint64, []ccip.GasPrice) error); ok { + r1 = rf(ctx, destChainSelector, gasPrices) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } -// ORM_InsertGasPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertGasPricesForDestChain' -type ORM_InsertGasPricesForDestChain_Call struct { +// ORM_UpsertGasPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertGasPricesForDestChain' +type ORM_UpsertGasPricesForDestChain_Call struct { *mock.Call } -// InsertGasPricesForDestChain is a helper method to define mock.On call +// UpsertGasPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - gasPrices []ccip.GasPriceUpdate -func (_e *ORM_Expecter) InsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, gasPrices interface{}) *ORM_InsertGasPricesForDestChain_Call { - return &ORM_InsertGasPricesForDestChain_Call{Call: _e.mock.On("InsertGasPricesForDestChain", ctx, destChainSelector, gasPrices)} +// - gasPrices []ccip.GasPrice +func (_e *ORM_Expecter) UpsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, gasPrices interface{}) *ORM_UpsertGasPricesForDestChain_Call { + return &ORM_UpsertGasPricesForDestChain_Call{Call: _e.mock.On("UpsertGasPricesForDestChain", ctx, destChainSelector, gasPrices)} } -func (_c *ORM_InsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPriceUpdate)) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_UpsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPrice)) *ORM_UpsertGasPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.GasPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.GasPrice)) }) return _c } -func (_c *ORM_InsertGasPricesForDestChain_Call) Return(_a0 error) *ORM_InsertGasPricesForDestChain_Call { - _c.Call.Return(_a0) +func (_c *ORM_UpsertGasPricesForDestChain_Call) Return(_a0 int64, _a1 error) *ORM_UpsertGasPricesForDestChain_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *ORM_InsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.GasPriceUpdate) error) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_UpsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.GasPrice) (int64, error)) *ORM_UpsertGasPricesForDestChain_Call { _c.Call.Return(run) return _c } -// InsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, tokenPrices -func (_m *ORM) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPriceUpdate) error { - ret := _m.Called(ctx, destChainSelector, tokenPrices) +// UpsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, tokenPrices, interval +func (_m *ORM) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPrice, interval time.Duration) (int64, error) { + ret := _m.Called(ctx, destChainSelector, tokenPrices, interval) if len(ret) == 0 { - panic("no return value specified for InsertTokenPricesForDestChain") + panic("no return value specified for UpsertTokenPricesForDestChain") } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPriceUpdate) error); ok { - r0 = rf(ctx, destChainSelector, tokenPrices) + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPrice, time.Duration) (int64, error)); ok { + return rf(ctx, destChainSelector, tokenPrices, interval) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPrice, time.Duration) int64); ok { + r0 = rf(ctx, destChainSelector, tokenPrices, interval) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, []ccip.TokenPrice, time.Duration) error); ok { + r1 = rf(ctx, destChainSelector, tokenPrices, interval) } else { - r0 = ret.Error(0) + r1 = ret.Error(1) } - return r0 + return r0, r1 } -// ORM_InsertTokenPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertTokenPricesForDestChain' -type ORM_InsertTokenPricesForDestChain_Call struct { +// ORM_UpsertTokenPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertTokenPricesForDestChain' +type ORM_UpsertTokenPricesForDestChain_Call struct { *mock.Call } -// InsertTokenPricesForDestChain is a helper method to define mock.On call +// UpsertTokenPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - tokenPrices []ccip.TokenPriceUpdate -func (_e *ORM_Expecter) InsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, tokenPrices interface{}) *ORM_InsertTokenPricesForDestChain_Call { - return &ORM_InsertTokenPricesForDestChain_Call{Call: _e.mock.On("InsertTokenPricesForDestChain", ctx, destChainSelector, tokenPrices)} +// - tokenPrices []ccip.TokenPrice +// - interval time.Duration +func (_e *ORM_Expecter) UpsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, tokenPrices interface{}, interval interface{}) *ORM_UpsertTokenPricesForDestChain_Call { + return &ORM_UpsertTokenPricesForDestChain_Call{Call: _e.mock.On("UpsertTokenPricesForDestChain", ctx, destChainSelector, tokenPrices, interval)} } -func (_c *ORM_InsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPriceUpdate)) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_UpsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPrice, interval time.Duration)) *ORM_UpsertTokenPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.TokenPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.TokenPrice), args[3].(time.Duration)) }) return _c } -func (_c *ORM_InsertTokenPricesForDestChain_Call) Return(_a0 error) *ORM_InsertTokenPricesForDestChain_Call { - _c.Call.Return(_a0) +func (_c *ORM_UpsertTokenPricesForDestChain_Call) Return(_a0 int64, _a1 error) *ORM_UpsertTokenPricesForDestChain_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *ORM_InsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.TokenPriceUpdate) error) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_UpsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.TokenPrice, time.Duration) (int64, error)) *ORM_UpsertTokenPricesForDestChain_Call { _c.Call.Return(run) return _c } diff --git a/core/services/ccip/observability.go b/core/services/ccip/observability.go new file mode 100644 index 00000000000..8a061893ce6 --- /dev/null +++ b/core/services/ccip/observability.go @@ -0,0 +1,115 @@ +package ccip + +import ( + "context" + "strconv" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +var ( + sqlLatencyBuckets = []float64{ + float64(10 * time.Millisecond), + float64(20 * time.Millisecond), + float64(30 * time.Millisecond), + float64(40 * time.Millisecond), + float64(50 * time.Millisecond), + float64(70 * time.Millisecond), + float64(90 * time.Millisecond), + float64(100 * time.Millisecond), + float64(200 * time.Millisecond), + float64(300 * time.Millisecond), + float64(400 * time.Millisecond), + float64(500 * time.Millisecond), + float64(750 * time.Millisecond), + float64(1 * time.Second), + } + ccipQueryDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "ccip_orm_query_duration", + Buckets: sqlLatencyBuckets, + }, []string{"query", "destChainSelector"}) + ccipQueryDatasets = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ccip_orm_dataset_size", + }, []string{"query", "destChainSelector"}) +) + +type observedORM struct { + ORM + queryDuration *prometheus.HistogramVec + datasetSize *prometheus.GaugeVec +} + +var _ ORM = (*observedORM)(nil) + +func NewObservedORM(ds sqlutil.DataSource, lggr logger.Logger) (*observedORM, error) { + delegate, err := NewORM(ds, lggr) + if err != nil { + return nil, err + } + + return &observedORM{ + ORM: delegate, + queryDuration: ccipQueryDuration, + datasetSize: ccipQueryDatasets, + }, nil +} + +func (o *observedORM) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) { + return withObservedQueryAndResults(o, "GetGasPricesByDestChain", destChainSelector, func() ([]GasPrice, error) { + return o.ORM.GetGasPricesByDestChain(ctx, destChainSelector) + }) +} + +func (o *observedORM) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) { + return withObservedQueryAndResults(o, "GetTokenPricesByDestChain", destChainSelector, func() ([]TokenPrice, error) { + return o.ORM.GetTokenPricesByDestChain(ctx, destChainSelector) + }) +} + +func (o *observedORM) UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) { + return withObservedQueryAndRowsAffected(o, "UpsertGasPricesForDestChain", destChainSelector, func() (int64, error) { + return o.ORM.UpsertGasPricesForDestChain(ctx, destChainSelector, gasPrices) + }) +} + +func (o *observedORM) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) { + return withObservedQueryAndRowsAffected(o, "UpsertTokenPricesForDestChain", destChainSelector, func() (int64, error) { + return o.ORM.UpsertTokenPricesForDestChain(ctx, destChainSelector, tokenPrices, interval) + }) +} + +func withObservedQueryAndRowsAffected(o *observedORM, queryName string, chainSelector uint64, query func() (int64, error)) (int64, error) { + rowsAffected, err := withObservedQuery(o, queryName, chainSelector, query) + if err == nil { + o.datasetSize. + WithLabelValues(queryName, strconv.FormatUint(chainSelector, 10)). + Set(float64(rowsAffected)) + } + return rowsAffected, err +} + +func withObservedQueryAndResults[T any](o *observedORM, queryName string, chainSelector uint64, query func() ([]T, error)) ([]T, error) { + results, err := withObservedQuery(o, queryName, chainSelector, query) + if err == nil { + o.datasetSize. + WithLabelValues(queryName, strconv.FormatUint(chainSelector, 10)). + Set(float64(len(results))) + } + return results, err +} + +func withObservedQuery[T any](o *observedORM, queryName string, chainSelector uint64, query func() (T, error)) (T, error) { + queryStarted := time.Now() + defer func() { + o.queryDuration. + WithLabelValues(queryName, strconv.FormatUint(chainSelector, 10)). + Observe(float64(time.Since(queryStarted))) + }() + return query() +} diff --git a/core/services/ccip/observability_test.go b/core/services/ccip/observability_test.go new file mode 100644 index 00000000000..24bfb4a9ec1 --- /dev/null +++ b/core/services/ccip/observability_test.go @@ -0,0 +1,94 @@ +package ccip + +import ( + "math/big" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func Test_MetricsAreTrackedForAllMethods(t *testing.T) { + ctx := testutils.Context(t) + db := pgtest.NewSqlxDB(t) + ccipORM, err := NewObservedORM(db, logger.TestLogger(t)) + require.NoError(t, err) + + tokenPrices := []TokenPrice{ + { + TokenAddr: "0xA", + TokenPrice: assets.NewWei(big.NewInt(1e18)), + }, + { + TokenAddr: "0xB", + TokenPrice: assets.NewWei(big.NewInt(1e18)), + }, + } + tokensUpserted, err := ccipORM.UpsertTokenPricesForDestChain(ctx, 100, tokenPrices, time.Second) + require.NoError(t, err) + assert.Equal(t, len(tokenPrices), int(tokensUpserted)) + assert.Equal(t, len(tokenPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertTokenPricesForDestChain", "100")) + assert.Equal(t, 0, counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertTokenPricesForDestChain", "200")) + + tokens, err := ccipORM.GetTokenPricesByDestChain(ctx, 100) + require.NoError(t, err) + assert.Equal(t, len(tokenPrices), len(tokens)) + assert.Equal(t, len(tokenPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "GetTokenPricesByDestChain", "100")) + assert.Equal(t, 1, counterFromHistogramByLabels(t, ccipORM.queryDuration, "GetTokenPricesByDestChain", "100")) + + gasPrices := []GasPrice{ + { + SourceChainSelector: 200, + GasPrice: assets.NewWei(big.NewInt(1e18)), + }, + { + SourceChainSelector: 201, + GasPrice: assets.NewWei(big.NewInt(1e18)), + }, + { + SourceChainSelector: 202, + GasPrice: assets.NewWei(big.NewInt(1e18)), + }, + } + gasUpserted, err := ccipORM.UpsertGasPricesForDestChain(ctx, 100, gasPrices) + require.NoError(t, err) + assert.Equal(t, len(gasPrices), int(gasUpserted)) + assert.Equal(t, len(gasPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertGasPricesForDestChain", "100")) + assert.Equal(t, 0, counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertGasPricesForDestChain", "200")) + + gas, err := ccipORM.GetGasPricesByDestChain(ctx, 100) + require.NoError(t, err) + assert.Equal(t, len(gasPrices), len(gas)) + assert.Equal(t, len(gasPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "GetGasPricesByDestChain", "100")) + assert.Equal(t, 1, counterFromHistogramByLabels(t, ccipORM.queryDuration, "GetGasPricesByDestChain", "100")) +} + +func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int { + observer, err := histogramVec.GetMetricWithLabelValues(labels...) + require.NoError(t, err) + + metricCh := make(chan prometheus.Metric, 1) + observer.(prometheus.Histogram).Collect(metricCh) + close(metricCh) + + metric := <-metricCh + pb := &io_prometheus_client.Metric{} + err = metric.Write(pb) + require.NoError(t, err) + + return int(pb.GetHistogram().GetSampleCount()) +} + +func counterFromGaugeByLabels(gaugeVec *prometheus.GaugeVec, labels ...string) int { + value := testutil.ToFloat64(gaugeVec.WithLabelValues(labels...)) + return int(value) +} diff --git a/core/services/ccip/orm.go b/core/services/ccip/orm.go index b513b13cc50..4676d59093a 100644 --- a/core/services/ccip/orm.go +++ b/core/services/ccip/orm.go @@ -8,61 +8,57 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + "github.com/smartcontractkit/chainlink/v2/core/logger" ) type GasPrice struct { SourceChainSelector uint64 GasPrice *assets.Wei - CreatedAt time.Time -} - -type GasPriceUpdate struct { - SourceChainSelector uint64 - GasPrice *assets.Wei } type TokenPrice struct { TokenAddr string TokenPrice *assets.Wei - CreatedAt time.Time -} - -type TokenPriceUpdate struct { - TokenAddr string - TokenPrice *assets.Wei } type ORM interface { GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) - InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error - InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error + UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) + UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) +} + +type tokenPriceRow struct { + TokenAddr string + TokenPrice *assets.Wei + CreatedAt time.Time } type orm struct { - ds sqlutil.DataSource + ds sqlutil.DataSource + lggr logger.Logger } var _ ORM = (*orm)(nil) -func NewORM(ds sqlutil.DataSource) (ORM, error) { +func NewORM(ds sqlutil.DataSource, lggr logger.Logger) (ORM, error) { if ds == nil { return nil, fmt.Errorf("datasource to CCIP NewORM cannot be nil") } return &orm{ - ds: ds, + ds: ds, + lggr: lggr, }, nil } func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) { var gasPrices []GasPrice stmt := ` - SELECT source_chain_selector, gas_price, created_at + SELECT source_chain_selector, gas_price FROM ccip.observed_gas_prices - WHERE chain_selector = $1 - ORDER BY source_chain_selector, created_at DESC; + WHERE chain_selector = $1; ` err := o.ds.SelectContext(ctx, &gasPrices, stmt, destChainSelector) if err != nil { @@ -75,25 +71,23 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) { var tokenPrices []TokenPrice stmt := ` - SELECT token_addr, token_price, created_at + SELECT token_addr, token_price FROM ccip.observed_token_prices - WHERE chain_selector = $1 - ORDER BY token_addr; + WHERE chain_selector = $1; ` err := o.ds.SelectContext(ctx, &tokenPrices, stmt, destChainSelector) if err != nil { return nil, err } - return tokenPrices, nil } -func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error { +func (o *orm) UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) { if len(gasPrices) == 0 { - return nil + return 0, nil } - uniqueGasUpdates := make(map[string]GasPriceUpdate) + uniqueGasUpdates := make(map[string]GasPrice) for _, gasPrice := range gasPrices { key := fmt.Sprintf("%d-%d", gasPrice.SourceChainSelector, destChainSelector) uniqueGasUpdates[key] = gasPrice @@ -110,28 +104,32 @@ func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, source_chain_selector, gas_price, created_at) VALUES (:chain_selector, :source_chain_selector, :gas_price, statement_timestamp()) - ON CONFLICT (chain_selector, source_chain_selector) + ON CONFLICT (source_chain_selector, chain_selector) DO UPDATE SET gas_price = EXCLUDED.gas_price, created_at = EXCLUDED.created_at;` - _, err := o.ds.NamedExecContext(ctx, stmt, insertData) + + result, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting gas prices %w", err) + return 0, fmt.Errorf("error inserting gas prices %w", err) } - return err + return result.RowsAffected() } -func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error { +// UpsertTokenPricesForDestChain inserts or updates only relevant token prices. +// In order to reduce locking an unnecessary writes to the table, we start with fetching current prices. +// If price for a token doesn't change or was updated recently we don't include that token to the upsert query. +// We don't run in TX intentionally, because we don't want to lock the table and conflicts are resolved on the insert level +func (o *orm) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) { if len(tokenPrices) == 0 { - return nil + return 0, nil } - uniqueTokenPrices := make(map[string]TokenPriceUpdate) - for _, tokenPrice := range tokenPrices { - key := fmt.Sprintf("%s-%d", tokenPrice.TokenAddr, destChainSelector) - uniqueTokenPrices[key] = tokenPrice + tokensToUpdate, err := o.pickOnlyRelevantTokensForUpdate(ctx, destChainSelector, tokenPrices, interval) + if err != nil || len(tokensToUpdate) == 0 { + return 0, err } - insertData := make([]map[string]interface{}, 0, len(uniqueTokenPrices)) - for _, price := range uniqueTokenPrices { + insertData := make([]map[string]interface{}, 0, len(tokensToUpdate)) + for _, price := range tokensToUpdate { insertData = append(insertData, map[string]interface{}{ "chain_selector": destChainSelector, "token_addr": price.TokenAddr, @@ -141,11 +139,75 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, token_addr, token_price, created_at) VALUES (:chain_selector, :token_addr, :token_price, statement_timestamp()) - ON CONFLICT (chain_selector, token_addr) + ON CONFLICT (token_addr, chain_selector) DO UPDATE SET token_price = EXCLUDED.token_price, created_at = EXCLUDED.created_at;` - _, err := o.ds.NamedExecContext(ctx, stmt, insertData) + result, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting token prices %w", err) + return 0, fmt.Errorf("error inserting token prices %w", err) + } + return result.RowsAffected() +} + +// pickOnlyRelevantTokensForUpdate returns only tokens that need to be updated. Multiple jobs can be updating the same tokens, +// in order to reduce table locking and redundant upserts we start with reading the table and checking which tokens are eligible for update. +// A token is eligible for update when: +// * price has changed and created_at date is older than the interval +// * it's not present in the result set from the db query (e.g. it's a new token) +// Therefore if there are no price changes for a single token we won't update the state of the database +func (o *orm) pickOnlyRelevantTokensForUpdate( + ctx context.Context, + destChainSelector uint64, + tokenPrices []TokenPrice, + interval time.Duration, +) ([]TokenPrice, error) { + stmt := ` + SELECT token_addr, token_price, created_at + FROM ccip.observed_token_prices + WHERE chain_selector = $1 and token_addr = ANY($2); + ` + + var dbTokenPrices []tokenPriceRow + if err := o.ds.SelectContext(ctx, &dbTokenPrices, stmt, destChainSelector, tokenAddrsToBytes(tokenPrices)); err != nil { + return nil, err + } + + updateThrehsold := time.Now().Add(-interval) + tokenPricesByAddr := toTokensByAddress(tokenPrices) + dbTokenPricesByAddr := toTokenRowsByAddress(dbTokenPrices) + tokenPricesToUpdate := make([]TokenPrice, 0, len(tokenPrices)) + for addr, price := range tokenPricesByAddr { + dbToken, ok := dbTokenPricesByAddr[addr] + eligibleForUpdate := false + if !ok || + (dbToken.CreatedAt.Before(updateThrehsold) && !dbToken.TokenPrice.Equal(price)) { + tokenPricesToUpdate = append(tokenPricesToUpdate, TokenPrice{addr, price}) + eligibleForUpdate = true + } + o.lggr.Debugw("Token price database update", "eligibleForUpdate", eligibleForUpdate, "token", addr, "price", price) + } + return tokenPricesToUpdate, nil +} + +func toTokensByAddress(tokens []TokenPrice) map[string]*assets.Wei { + tokensByAddr := make(map[string]*assets.Wei, len(tokens)) + for _, tk := range tokens { + tokensByAddr[tk.TokenAddr] = tk.TokenPrice + } + return tokensByAddr +} + +func toTokenRowsByAddress(tokens []tokenPriceRow) map[string]tokenPriceRow { + tokensByAddr := make(map[string]tokenPriceRow, len(tokens)) + for _, tk := range tokens { + tokensByAddr[tk.TokenAddr] = tk + } + return tokensByAddr +} + +func tokenAddrsToBytes(tokens []TokenPrice) [][]byte { + addrs := make([][]byte, 0, len(tokens)) + for _, tk := range tokens { + addrs = append(addrs, []byte(tk.TokenAddr)) } - return err + return addrs } diff --git a/core/services/ccip/orm_test.go b/core/services/ccip/orm_test.go index f7b8a6d5a4f..9c51d620f5b 100644 --- a/core/services/ccip/orm_test.go +++ b/core/services/ccip/orm_test.go @@ -15,13 +15,18 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +var ( + r = rand.New(rand.NewSource(time.Now().UnixNano())) ) func setupORM(t *testing.T) (ORM, sqlutil.DataSource) { t.Helper() db := pgtest.NewSqlxDB(t) - orm, err := NewORM(db) + orm, err := NewORM(db, logger.TestLogger(t)) require.NoError(t, err) @@ -37,12 +42,12 @@ func generateChainSelectors(n int) []uint64 { return selectors } -func generateGasPriceUpdates(chainSelector uint64, n int) []GasPriceUpdate { - updates := make([]GasPriceUpdate, n) +func generateGasPrices(chainSelector uint64, n int) []GasPrice { + updates := make([]GasPrice, n) for i := 0; i < n; i++ { // gas prices can take up whole range of uint256 uint256Max := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(256), nil), big.NewInt(1)) - row := GasPriceUpdate{ + row := GasPrice{ SourceChainSelector: chainSelector, GasPrice: assets.NewWei(new(big.Int).Sub(uint256Max, big.NewInt(int64(i)))), } @@ -61,10 +66,21 @@ func generateTokenAddresses(n int) []string { return addrs } -func generateTokenPriceUpdates(tokenAddr string, n int) []TokenPriceUpdate { - updates := make([]TokenPriceUpdate, n) +func generateRandomTokenPrices(tokenAddrs []string) []TokenPrice { + updates := make([]TokenPrice, 0, len(tokenAddrs)) + for _, addr := range tokenAddrs { + updates = append(updates, TokenPrice{ + TokenAddr: addr, + TokenPrice: assets.NewWei(new(big.Int).Rand(r, big.NewInt(1e18))), + }) + } + return updates +} + +func generateTokenPrices(tokenAddr string, n int) []TokenPrice { + updates := make([]TokenPrice, n) for i := 0; i < n; i++ { - row := TokenPriceUpdate{ + row := TokenPrice{ TokenAddr: tokenAddr, TokenPrice: assets.NewWei(new(big.Int).Mul(big.NewInt(1e18), big.NewInt(int64(i)))), } @@ -134,20 +150,20 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { sourceSelectors := generateChainSelectors(numSourceChainSelectors) - updates := make(map[uint64][]GasPriceUpdate) + updates := make(map[uint64][]GasPrice) for _, selector := range sourceSelectors { - updates[selector] = generateGasPriceUpdates(selector, numUpdatesPerSourceSelector) + updates[selector] = generateGasPrices(selector, numUpdatesPerSourceSelector) } // 5 jobs, each inserting prices for 10 chains, with 20 updates per chain. - expectedPrices := make(map[uint64]GasPriceUpdate) + expectedPrices := make(map[uint64]GasPrice) for i := 0; i < numJobs; i++ { for selector, updatesPerSelector := range updates { lastIndex := len(updatesPerSelector) - 1 - err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[:lastIndex]) + _, err := orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[:lastIndex]) assert.NoError(t, err) - err = orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[lastIndex:]) + _, err = orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[lastIndex:]) assert.NoError(t, err) expectedPrices[selector] = updatesPerSelector[lastIndex] @@ -170,13 +186,13 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { } // after the initial inserts, insert new round of prices, 1 price per selector this time - var combinedUpdates []GasPriceUpdate + var combinedUpdates []GasPrice for selector, updatesPerSelector := range updates { combinedUpdates = append(combinedUpdates, updatesPerSelector[0]) expectedPrices[selector] = updatesPerSelector[0] } - err = orm.InsertGasPricesForDestChain(ctx, destSelector, combinedUpdates) + _, err = orm.UpsertGasPricesForDestChain(ctx, destSelector, combinedUpdates) assert.NoError(t, err) assert.Equal(t, numSourceChainSelectors, getGasTableRowCount(t, db)) @@ -190,7 +206,7 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { } } -func TestORM_InsertAndDeleteGasPrices(t *testing.T) { +func TestORM_UpsertGasPrices(t *testing.T) { t.Parallel() ctx := testutils.Context(t) @@ -202,13 +218,13 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { sourceSelectors := generateChainSelectors(numSourceChainSelectors) - updates := make(map[uint64][]GasPriceUpdate) + updates := make(map[uint64][]GasPrice) for _, selector := range sourceSelectors { - updates[selector] = generateGasPriceUpdates(selector, numUpdatesPerSourceSelector) + updates[selector] = generateGasPrices(selector, numUpdatesPerSourceSelector) } for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) + _, err := orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } @@ -217,7 +233,7 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { // insert for the 2nd time after interimTimeStamp for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) + _, err := orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } @@ -237,20 +253,20 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { addrs := generateTokenAddresses(numAddresses) - updates := make(map[string][]TokenPriceUpdate) + updates := make(map[string][]TokenPrice) for _, addr := range addrs { - updates[addr] = generateTokenPriceUpdates(addr, numUpdatesPerAddress) + updates[addr] = generateTokenPrices(addr, numUpdatesPerAddress) } // 5 jobs, each inserting prices for 10 chains, with 20 updates per chain. - expectedPrices := make(map[string]TokenPriceUpdate) + expectedPrices := make(map[string]TokenPrice) for i := 0; i < numJobs; i++ { for addr, updatesPerAddr := range updates { lastIndex := len(updatesPerAddr) - 1 - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[:lastIndex]) + _, err := orm.UpsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[:lastIndex], 0) assert.NoError(t, err) - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[lastIndex:]) + _, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[lastIndex:], 0) assert.NoError(t, err) expectedPrices[addr] = updatesPerAddr[lastIndex] @@ -273,13 +289,13 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { } // after the initial inserts, insert new round of prices, 1 price per selector this time - var combinedUpdates []TokenPriceUpdate + var combinedUpdates []TokenPrice for addr, updatesPerAddr := range updates { combinedUpdates = append(combinedUpdates, updatesPerAddr[0]) expectedPrices[addr] = updatesPerAddr[0] } - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, combinedUpdates) + _, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, combinedUpdates, 0) assert.NoError(t, err) assert.Equal(t, numAddresses, getTokenTableRowCount(t, db)) @@ -293,36 +309,74 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { } } -func TestORM_InsertAndDeleteTokenPrices(t *testing.T) { +func TestORM_InsertTokenPricesWhenExpired(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - - orm, db := setupORM(t) + orm, _ := setupORM(t) numAddresses := 10 - numUpdatesPerAddress := 20 - destSelector := uint64(1) - + destSelector := rand.Uint64() addrs := generateTokenAddresses(numAddresses) + initTokenUpdates := generateRandomTokenPrices(addrs) - updates := make(map[string][]TokenPriceUpdate) - for _, addr := range addrs { - updates[addr] = generateTokenPriceUpdates(addr, numUpdatesPerAddress) - } + // Insert the first time, table is initialized + rowsUpdated, err := orm.UpsertTokenPricesForDestChain(ctx, destSelector, initTokenUpdates, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(numAddresses), rowsUpdated) - for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr) - assert.NoError(t, err) - } + //time.Sleep(100 * time.Millisecond) - sleepSec := 2 - time.Sleep(time.Duration(sleepSec) * time.Second) + // Insert the second time, no updates, because prices haven't changed + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, initTokenUpdates, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(0), rowsUpdated) - // insert for the 2nd time after interimTimeStamp - for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr) - assert.NoError(t, err) + // There are new prices, but we still haven't reached interval + newPrices := generateRandomTokenPrices(addrs) + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, newPrices, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(0), rowsUpdated) + + time.Sleep(100 * time.Millisecond) + + // Again with the same new prices, but this time interval is reached + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, newPrices, time.Millisecond) + require.NoError(t, err) + assert.Equal(t, int64(numAddresses), rowsUpdated) + + dbTokenPrices, err := orm.GetTokenPricesByDestChain(ctx, destSelector) + require.NoError(t, err) + assert.Len(t, dbTokenPrices, numAddresses) + + dbTokenPricesByAddr := toTokensByAddress(dbTokenPrices) + for _, tkPrice := range newPrices { + dbToken, ok := dbTokenPricesByAddr[tkPrice.TokenAddr] + assert.True(t, ok) + assert.Equal(t, dbToken, tkPrice.TokenPrice) } - assert.Equal(t, numAddresses, getTokenTableRowCount(t, db)) + // Single token gets an update + newPrices[0].TokenPrice = assets.NewWei(new(big.Int).Add(newPrices[0].TokenPrice.ToInt(), big.NewInt(1))) + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, newPrices, time.Nanosecond) + require.NoError(t, err) + assert.Equal(t, int64(1), rowsUpdated) +} + +func Benchmark_UpsertsTheSameTokenPrices(b *testing.B) { + db := pgtest.NewSqlxDB(b) + orm, err := NewORM(db, logger.NullLogger) + require.NoError(b, err) + + ctx := testutils.Context(b) + numAddresses := 50 + destSelector := rand.Uint64() + addrs := generateTokenAddresses(numAddresses) + tokenUpdates := generateRandomTokenPrices(addrs) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err1 := orm.UpsertTokenPricesForDestChain(ctx, destSelector, tokenUpdates, time.Second) + require.NoError(b, err1) + } } diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go index e964896ab93..378c71ac2fe 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go @@ -156,7 +156,7 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c onRampAddress, ) - orm, err := cciporm.NewORM(ds) + orm, err := cciporm.NewObservedORM(ds, lggr) if err != nil { return nil, err } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go index 23ea3dcbd00..be77e8f1c3b 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go @@ -13,7 +13,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" cciporm "github.com/smartcontractkit/chainlink/v2/core/services/ccip" @@ -22,6 +21,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" + "github.com/smartcontractkit/chainlink/v2/core/utils" ) // PriceService manages DB access for gas and token price data. @@ -42,17 +42,11 @@ type PriceService interface { var _ PriceService = (*priceService)(nil) const ( - // Prices should expire after 10 minutes in DB. Prices should be fresh in the Commit plugin. - // 10 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while - // surfacing price update outages quickly enough. - priceExpireSec = 600 - // Prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. priceUpdateInterval = 60 * time.Second ) type priceService struct { - priceExpireSec int updateInterval time.Duration lggr logger.Logger @@ -88,8 +82,7 @@ func NewPriceService( ctx, cancel := context.WithCancel(context.Background()) pw := &priceService{ - priceExpireSec: priceExpireSec, - updateInterval: utils.WithJitter(priceUpdateInterval), + updateInterval: priceUpdateInterval, lggr: lggr, orm: orm, @@ -128,7 +121,7 @@ func (p *priceService) Close() error { } func (p *priceService) run() { - updateTicker := time.NewTicker(p.updateInterval) + updateTicker := time.NewTicker(utils.WithJitter(p.updateInterval)) go func() { defer p.wg.Done() @@ -325,20 +318,21 @@ func (p *priceService) writePricesToDB( if sourceGasPriceUSD != nil { eg.Go(func() error { - return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, []cciporm.GasPriceUpdate{ + _, err1 := p.orm.UpsertGasPricesForDestChain(ctx, p.destChainSelector, []cciporm.GasPrice{ { SourceChainSelector: p.sourceChainSelector, GasPrice: assets.NewWei(sourceGasPriceUSD), }, }) + return err1 }) } if tokenPricesUSD != nil { - var tokenPrices []cciporm.TokenPriceUpdate + var tokenPrices []cciporm.TokenPrice for token, price := range tokenPricesUSD { - tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ + tokenPrices = append(tokenPrices, cciporm.TokenPrice{ TokenAddr: string(token), TokenPrice: assets.NewWei(price), }) @@ -350,7 +344,8 @@ func (p *priceService) writePricesToDB( }) eg.Go(func() error { - return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, tokenPrices) + _, err1 := p.orm.UpsertTokenPricesForDestChain(ctx, p.destChainSelector, tokenPrices, p.updateInterval) + return err1 }) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go index 872d933a944..245bda995cc 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go @@ -18,7 +18,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" cciporm "github.com/smartcontractkit/chainlink/v2/core/services/ccip" @@ -42,13 +41,13 @@ func TestPriceService_priceWrite(t *testing.T) { "0x234": big.NewInt(3e18), } - expectedGasPriceUpdate := []cciporm.GasPriceUpdate{ + expectedGasPriceUpdate := []cciporm.GasPrice{ { SourceChainSelector: sourceChainSelector, GasPrice: assets.NewWei(gasPrice), }, } - expectedTokenPriceUpdate := []cciporm.TokenPriceUpdate{ + expectedTokenPriceUpdate := []cciporm.TokenPrice{ { TokenAddr: "0x123", TokenPrice: assets.NewWei(big.NewInt(2e18)), @@ -105,8 +104,10 @@ func TestPriceService_priceWrite(t *testing.T) { } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, expectedGasPriceUpdate).Return(gasPricesError).Once() - mockOrm.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, expectedTokenPriceUpdate).Return(tokenPricesError).Once() + mockOrm.On("UpsertGasPricesForDestChain", ctx, destChainSelector, expectedGasPriceUpdate). + Return(int64(len(expectedGasPriceUpdate)), gasPricesError).Once() + mockOrm.On("UpsertTokenPricesForDestChain", ctx, destChainSelector, expectedTokenPriceUpdate, priceUpdateInterval). + Return(int64(len(expectedTokenPriceUpdate)), tokenPricesError).Once() priceService := NewPriceService( lggr, @@ -555,7 +556,7 @@ func setupORM(t *testing.T) cciporm.ORM { t.Helper() db := pgtest.NewSqlxDB(t) - orm, err := cciporm.NewORM(db) + orm, err := cciporm.NewORM(db, logger.TestLogger(t)) require.NoError(t, err) @@ -577,7 +578,7 @@ func checkResultLen(t *testing.T, priceService PriceService, destChainSelector u return nil } -func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { +func TestPriceService_priceWriteInBackground(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) @@ -640,8 +641,6 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { // run write task every 2 second priceService.updateInterval = updateInterval - // expire all prices during every cleanup - priceService.priceExpireSec = 0 // initially, db is empty assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0)) @@ -654,23 +653,5 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { assert.NoError(t, err) assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 1, len(laneTokens))) - // eventually prices will be cleaned - assert.Eventually(t, func() bool { - err := checkResultLen(t, priceService, destChainSelector, 0, 0) - return err == nil - }, testutils.WaitTimeout(t), testutils.TestInterval) - - // then prices will be updated again - assert.Eventually(t, func() bool { - err := checkResultLen(t, priceService, destChainSelector, 1, len(laneTokens)) - return err == nil - }, testutils.WaitTimeout(t), testutils.TestInterval) - assert.NoError(t, priceService.Close()) - - // after stopping PriceService and runCleanup, no more updates are inserted - for i := 0; i < 5; i++ { - time.Sleep(time.Second) - assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0)) - } } diff --git a/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql b/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql index ebadf544684..686c9d10614 100644 --- a/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql +++ b/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql @@ -10,7 +10,7 @@ CREATE TABLE ccip.observed_token_prices token_addr BYTEA NOT NULL, token_price NUMERIC(78, 0) NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - CONSTRAINT unique_token_chain_selector UNIQUE (token_addr, chain_selector) + PRIMARY KEY (chain_selector, token_addr) ); CREATE TABLE ccip.observed_gas_prices @@ -19,7 +19,7 @@ CREATE TABLE ccip.observed_gas_prices source_chain_selector NUMERIC(20, 0) NOT NULL, gas_price NUMERIC(78, 0) NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - CONSTRAINT unique_source_dest_chain UNIQUE (source_chain_selector, chain_selector) + PRIMARY KEY (chain_selector, source_chain_selector) ); -- +goose Down