Skip to content

Commit

Permalink
CCIP-1277 LogPoller - Fixing leaky abstraction by removing Q() from t…
Browse files Browse the repository at this point in the history
…he ORM interface (#11200)

* Fixing leaky abstraction by removing Q() from the ORM interface. Moving all the TX internals into the ORM implementation

* Adding test

* Post review fix

* Post rebase fixes
  • Loading branch information
mateusz-sekara authored Nov 13, 2023
1 parent b66b723 commit f7981f5
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 80 deletions.
39 changes: 7 additions & 32 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
}

lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks)
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
return lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithQueryer(tx))
})
err = lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithParentCtx(ctx))
if err != nil {
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
return err
Expand Down Expand Up @@ -750,21 +748,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
// the canonical set per read. Typically, if an application took action on a log
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
// Its also nicely analogous to reading from the chain itself.
err2 = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
// These deletes are bounded by reorg depth, so they are
// fast and should not slow down the log readers.
err3 := lp.orm.DeleteBlocksAfter(blockAfterLCA.Number, pg.WithQueryer(tx))
if err3 != nil {
lp.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err3)
return err3
}
err3 = lp.orm.DeleteLogsAfter(blockAfterLCA.Number, pg.WithQueryer(tx))
if err3 != nil {
lp.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err3)
return err3
}
return nil
})
err2 = lp.orm.DeleteLogsAndBlocksAfter(blockAfterLCA.Number, pg.WithParentCtx(ctx))
if err2 != nil {
// If we error on db commit, we can't know if the tx went through or not.
// We return an error here which will cause us to restart polling from lastBlockSaved + 1
Expand Down Expand Up @@ -849,20 +833,11 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
return
}
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp.Unix())
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
if err2 := lp.orm.InsertBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber, pg.WithQueryer(tx)); err2 != nil {
return err2
}
if len(logs) == 0 {
return nil
}
return lp.orm.InsertLogs(convertLogs(logs,
[]LogPollerBlock{{BlockNumber: currentBlockNumber,
BlockTimestamp: currentBlock.Timestamp}},
lp.lggr,
lp.ec.ConfiguredChainID(),
), pg.WithQueryer(tx))
})
block := NewLogPollerBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber)
err = lp.orm.InsertLogsWithBlock(
convertLogs(logs, []LogPollerBlock{block}, lp.lggr, lp.ec.ConfiguredChainID()),
block,
)
if err != nil {
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
return
Expand Down
9 changes: 9 additions & 0 deletions core/chains/evm/logpoller/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,12 @@ func (l *Log) ToGethLog() types.Log {
Index: uint(l.LogIndex),
}
}

func NewLogPollerBlock(blockHash common.Hash, blockNumber int64, timestamp time.Time, finalizedBlockNumber int64) LogPollerBlock {
return LogPollerBlock{
BlockHash: blockHash,
BlockNumber: blockNumber,
BlockTimestamp: timestamp,
FinalizedBlockNumber: finalizedBlockNumber,
}
}
22 changes: 6 additions & 16 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,15 @@ func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QC
}
}

func (o *ObservedORM) Q() pg.Q {
return o.ORM.Q()
}

func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogs", func() error {
return o.ORM.InsertLogs(logs, qopts...)
})
}

func (o *ObservedORM) InsertBlock(hash common.Hash, blockNumber int64, blockTimestamp time.Time, lastFinalizedBlock int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertBlock", func() error {
return o.ORM.InsertBlock(hash, blockNumber, blockTimestamp, lastFinalizedBlock, qopts...)
func (o *ObservedORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogsWithBlock", func() error {
return o.ORM.InsertLogsWithBlock(logs, block, qopts...)
})
}

Expand All @@ -102,21 +98,15 @@ func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error {
})
}

func (o *ObservedORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksAfter", func() error {
return o.ORM.DeleteBlocksAfter(start, qopts...)
})
}

func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksBefore", func() error {
return o.ORM.DeleteBlocksBefore(end, qopts...)
})
}

func (o *ObservedORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteLogsAfter", func() error {
return o.ORM.DeleteLogsAfter(start, qopts...)
func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteLogsAndBlocksAfter", func() error {
return o.ORM.DeleteLogsAndBlocksAfter(start, qopts...)
})
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestMultipleMetricsArePublished(t *testing.T) {
_, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(0, []common.Address{{}}, []common.Hash{{}}, 1, pg.WithParentCtx(ctx))
_, _ = orm.SelectIndexedLogsCreatedAfter(common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx))
_ = orm.InsertLogs([]Log{}, pg.WithParentCtx(ctx))
_ = orm.InsertBlock(common.Hash{}, 1, time.Now(), 0, pg.WithParentCtx(ctx))
_ = orm.InsertLogsWithBlock([]Log{}, NewLogPollerBlock(common.Hash{}, 1, time.Now(), 0), pg.WithParentCtx(ctx))

require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration))
require.Equal(t, 10, testutil.CollectAndCount(orm.datasetSize))
Expand Down
99 changes: 72 additions & 27 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ import (
// it exposes some of the database implementation details (e.g. pg.Q). Ideally it should be agnostic and could be applied to any persistence layer.
// What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM.
type ORM interface {
Q() pg.Q
InsertLogs(logs []Log, qopts ...pg.QOpt) error
InsertBlock(blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, lastFinalizedBlockNumber int64, qopts ...pg.QOpt) error
InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error
InsertFilter(filter Filter, qopts ...pg.QOpt) error

LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error)
DeleteFilter(name string, qopts ...pg.QOpt) error

DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error
DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error
DeleteLogsAfter(start int64, qopts ...pg.QOpt) error
DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error
DeleteExpiredLogs(qopts ...pg.QOpt) error

GetBlocksRange(start int64, end int64, qopts ...pg.QOpt) ([]LogPollerBlock, error)
Expand Down Expand Up @@ -58,6 +56,7 @@ type ORM interface {
type DbORM struct {
chainID *big.Int
q pg.Q
lggr logger.Logger
}

// NewORM creates a DbORM scoped to chainID.
Expand All @@ -67,13 +66,10 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *
return &DbORM{
chainID: chainID,
q: q,
lggr: lggr,
}
}

func (o *DbORM) Q() pg.Q {
return o.q
}

// InsertBlock is idempotent to support replays.
func (o *DbORM) InsertBlock(blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64, qopts ...pg.QOpt) error {
args, err := newQueryArgs(o.chainID).
Expand Down Expand Up @@ -191,22 +187,38 @@ func (o *DbORM) SelectLatestLogByEventSigWithConfs(eventSig common.Hash, address
return &l, nil
}

// DeleteBlocksAfter delete all blocks after and including start.
func (o *DbORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)
return q.ExecQ(`DELETE FROM evm.log_poller_blocks WHERE block_number >= $1 AND evm_chain_id = $2`, start, utils.NewBig(o.chainID))
}

// DeleteBlocksBefore delete all blocks before and including end.
func (o *DbORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)
_, err := q.Exec(`DELETE FROM evm.log_poller_blocks WHERE block_number <= $1 AND evm_chain_id = $2`, end, utils.NewBig(o.chainID))
return err
}

func (o *DbORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)
return q.ExecQ(`DELETE FROM evm.logs WHERE block_number >= $1 AND evm_chain_id = $2`, start, utils.NewBig(o.chainID))
func (o *DbORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error {
// These deletes are bounded by reorg depth, so they are
// fast and should not slow down the log readers.
return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error {
args, err := newQueryArgs(o.chainID).
withStartBlock(start).
toArgs()
if err != nil {
o.lggr.Error("Cant build args for DeleteLogsAndBlocksAfter queries", "err", err)
return err
}

_, err = tx.NamedExec(`DELETE FROM evm.log_poller_blocks WHERE block_number >= :start_block AND evm_chain_id = :evm_chain_id`, args)
if err != nil {
o.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err)
return err
}

_, err = tx.NamedExec(`DELETE FROM evm.logs WHERE block_number >= :start_block AND evm_chain_id = :evm_chain_id`, args)
if err != nil {
o.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err)
return err
}
return nil
})
}

type Exp struct {
Expand All @@ -233,26 +245,50 @@ func (o *DbORM) DeleteExpiredLogs(qopts ...pg.QOpt) error {

// InsertLogs is idempotent to support replays.
func (o *DbORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error {
for _, log := range logs {
if o.chainID.Cmp(log.EvmChainId.ToInt()) != 0 {
return errors.Errorf("invalid chainID in log got %v want %v", log.EvmChainId.ToInt(), o.chainID)
}
if err := o.validateLogs(logs); err != nil {
return err
}
q := o.q.WithOpts(qopts...)

return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error {
return o.insertLogsWithinTx(logs, tx)
})
}

func (o *DbORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error {
// Optimization, don't open TX when there is only a block to be persisted
if len(logs) == 0 {
return o.InsertBlock(block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, qopts...)
}

if err := o.validateLogs(logs); err != nil {
return err
}

// Block and logs goes with the same TX to ensure atomicity
return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error {
if err := o.InsertBlock(block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, pg.WithQueryer(tx)); err != nil {
return err
}
return o.insertLogsWithinTx(logs, tx)
})
}

func (o *DbORM) insertLogsWithinTx(logs []Log, tx pg.Queryer) error {
batchInsertSize := 4000
for i := 0; i < len(logs); i += batchInsertSize {
start, end := i, i+batchInsertSize
if end > len(logs) {
end = len(logs)
}

err := q.ExecQNamed(`
INSERT INTO evm.logs
(evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at)
_, err := tx.NamedExec(`
INSERT INTO evm.logs
(evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at)
VALUES
(:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW())
ON CONFLICT DO NOTHING`, logs[start:end])
(:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW())
ON CONFLICT DO NOTHING`,
logs[start:end],
)

if err != nil {
if errors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 {
Expand All @@ -267,6 +303,15 @@ func (o *DbORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error {
return nil
}

func (o *DbORM) validateLogs(logs []Log) error {
for _, log := range logs {
if o.chainID.Cmp(log.EvmChainId.ToInt()) != 0 {
return errors.Errorf("invalid chainID in log got %v want %v", log.EvmChainId.ToInt(), o.chainID)
}
}
return nil
}

func (o *DbORM) SelectLogsByBlockRange(start, end int64) ([]Log, error) {
args, err := newQueryArgs(o.chainID).
withStartBlock(start).
Expand Down
Loading

0 comments on commit f7981f5

Please sign in to comment.