Skip to content

Commit

Permalink
Simplify codebase and improve performance by switching to upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Aug 13, 2024
1 parent 5d4d996 commit e733b92
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 315 deletions.
138 changes: 20 additions & 118 deletions core/services/ccip/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 33 additions & 42 deletions core/services/ccip/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,8 @@ type ORM interface {
GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error)
GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error)

InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error
InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error

ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error
InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error
}

type orm struct {
Expand All @@ -62,8 +59,7 @@ func NewORM(ds sqlutil.DataSource) (ORM, error) {
func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) {
var gasPrices []GasPrice
stmt := `
SELECT DISTINCT ON (source_chain_selector)
source_chain_selector, gas_price, created_at
SELECT source_chain_selector, gas_price, created_at
FROM ccip.observed_gas_prices
WHERE chain_selector = $1
ORDER BY source_chain_selector, created_at DESC;
Expand All @@ -79,11 +75,10 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin
func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) {
var tokenPrices []TokenPrice
stmt := `
SELECT DISTINCT ON (token_addr)
token_addr, token_price, created_at
SELECT token_addr, token_price, created_at
FROM ccip.observed_token_prices
WHERE chain_selector = $1
ORDER BY token_addr, created_at DESC;
ORDER BY token_addr;
`
err := o.ds.SelectContext(ctx, &tokenPrices, stmt, destChainSelector)
if err != nil {
Expand All @@ -93,68 +88,64 @@ func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector u
return tokenPrices, nil
}

func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error {
func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error {
if len(gasPrices) == 0 {
return nil
}

insertData := make([]map[string]interface{}, 0, len(gasPrices))
for _, price := range gasPrices {
uniqueGasUpdates := make(map[string]GasPriceUpdate)
for _, gasPrice := range gasPrices {
key := fmt.Sprintf("%d-%d", gasPrice.SourceChainSelector, destChainSelector)
uniqueGasUpdates[key] = gasPrice
}

insertData := make([]map[string]interface{}, 0, len(uniqueGasUpdates))
for _, price := range uniqueGasUpdates {
insertData = append(insertData, map[string]interface{}{
"chain_selector": destChainSelector,
"job_id": jobId,
"source_chain_selector": price.SourceChainSelector,
"gas_price": price.GasPrice,
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at)
VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());`
stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, source_chain_selector, gas_price, created_at)
VALUES (:chain_selector, :source_chain_selector, :gas_price, statement_timestamp())
ON CONFLICT (chain_selector, source_chain_selector)
DO UPDATE SET gas_price = EXCLUDED.gas_price, created_at = EXCLUDED.created_at;`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
if err != nil {
err = fmt.Errorf("error inserting gas prices for job %d: %w", jobId, err)
err = fmt.Errorf("error inserting gas prices %w", err)
}

return err
}

func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error {
func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error {
if len(tokenPrices) == 0 {
return nil
}

insertData := make([]map[string]interface{}, 0, len(tokenPrices))
for _, price := range tokenPrices {
uniqueTokenPrices := make(map[string]TokenPriceUpdate)
for _, tokenPrice := range tokenPrices {
key := fmt.Sprintf("%s-%d", tokenPrice.TokenAddr, destChainSelector)
uniqueTokenPrices[key] = tokenPrice
}

insertData := make([]map[string]interface{}, 0, len(uniqueTokenPrices))
for _, price := range uniqueTokenPrices {
insertData = append(insertData, map[string]interface{}{
"chain_selector": destChainSelector,
"job_id": jobId,
"token_addr": price.TokenAddr,
"token_price": price.TokenPrice,
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at)
VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());`
stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, token_addr, token_price, created_at)
VALUES (:chain_selector, :token_addr, :token_price, statement_timestamp())
ON CONFLICT (chain_selector, token_addr)
DO UPDATE SET token_price = EXCLUDED.token_price, created_at = EXCLUDED.created_at;`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
if err != nil {
err = fmt.Errorf("error inserting token prices for job %d: %w", jobId, err)
err = fmt.Errorf("error inserting token prices %w", err)
}

return err
}

func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}

func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}
Loading

0 comments on commit e733b92

Please sign in to comment.