diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 01d6a2aad47..b86ede5dbcb 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -679,9 +679,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { } lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks) - err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { - return lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithQueryer(tx)) - }) + err = lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithParentCtx(ctx)) if err != nil { lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to) return err @@ -750,21 +748,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren // the canonical set per read. Typically, if an application took action on a log // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. // Its also nicely analogous to reading from the chain itself. - err2 = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { - // These deletes are bounded by reorg depth, so they are - // fast and should not slow down the log readers. - err3 := lp.orm.DeleteBlocksAfter(blockAfterLCA.Number, pg.WithQueryer(tx)) - if err3 != nil { - lp.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err3) - return err3 - } - err3 = lp.orm.DeleteLogsAfter(blockAfterLCA.Number, pg.WithQueryer(tx)) - if err3 != nil { - lp.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err3) - return err3 - } - return nil - }) + err2 = lp.orm.DeleteLogsAndBlocksAfter(blockAfterLCA.Number, pg.WithParentCtx(ctx)) if err2 != nil { // If we error on db commit, we can't know if the tx went through or not. // We return an error here which will cause us to restart polling from lastBlockSaved + 1 @@ -849,20 +833,11 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int return } lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp.Unix()) - err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { - if err2 := lp.orm.InsertBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber, pg.WithQueryer(tx)); err2 != nil { - return err2 - } - if len(logs) == 0 { - return nil - } - return lp.orm.InsertLogs(convertLogs(logs, - []LogPollerBlock{{BlockNumber: currentBlockNumber, - BlockTimestamp: currentBlock.Timestamp}}, - lp.lggr, - lp.ec.ConfiguredChainID(), - ), pg.WithQueryer(tx)) - }) + block := NewLogPollerBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber) + err = lp.orm.InsertLogsWithBlock( + convertLogs(logs, []LogPollerBlock{block}, lp.lggr, lp.ec.ConfiguredChainID()), + block, + ) if err != nil { lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber) return diff --git a/core/chains/evm/logpoller/models.go b/core/chains/evm/logpoller/models.go index 9c55786777c..87ddd079a5b 100644 --- a/core/chains/evm/logpoller/models.go +++ b/core/chains/evm/logpoller/models.go @@ -56,3 +56,12 @@ func (l *Log) ToGethLog() types.Log { Index: uint(l.LogIndex), } } + +func NewLogPollerBlock(blockHash common.Hash, blockNumber int64, timestamp time.Time, finalizedBlockNumber int64) LogPollerBlock { + return LogPollerBlock{ + BlockHash: blockHash, + BlockNumber: blockNumber, + BlockTimestamp: timestamp, + FinalizedBlockNumber: finalizedBlockNumber, + } +} diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index c4b58b42a2e..03f4b77be25 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -68,19 +68,15 @@ func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QC } } -func (o *ObservedORM) Q() pg.Q { - return o.ORM.Q() -} - func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { return withObservedExec(o, "InsertLogs", func() error { return o.ORM.InsertLogs(logs, qopts...) }) } -func (o *ObservedORM) InsertBlock(hash common.Hash, blockNumber int64, blockTimestamp time.Time, lastFinalizedBlock int64, qopts ...pg.QOpt) error { - return withObservedExec(o, "InsertBlock", func() error { - return o.ORM.InsertBlock(hash, blockNumber, blockTimestamp, lastFinalizedBlock, qopts...) +func (o *ObservedORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error { + return withObservedExec(o, "InsertLogsWithBlock", func() error { + return o.ORM.InsertLogsWithBlock(logs, block, qopts...) }) } @@ -102,21 +98,15 @@ func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error { }) } -func (o *ObservedORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error { - return withObservedExec(o, "DeleteBlocksAfter", func() error { - return o.ORM.DeleteBlocksAfter(start, qopts...) - }) -} - func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { return withObservedExec(o, "DeleteBlocksBefore", func() error { return o.ORM.DeleteBlocksBefore(end, qopts...) }) } -func (o *ObservedORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error { - return withObservedExec(o, "DeleteLogsAfter", func() error { - return o.ORM.DeleteLogsAfter(start, qopts...) +func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error { + return withObservedExec(o, "DeleteLogsAndBlocksAfter", func() error { + return o.ORM.DeleteLogsAndBlocksAfter(start, qopts...) }) } diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 0d3eadf47d7..ded3d7854dd 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -38,7 +38,7 @@ func TestMultipleMetricsArePublished(t *testing.T) { _, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(0, []common.Address{{}}, []common.Hash{{}}, 1, pg.WithParentCtx(ctx)) _, _ = orm.SelectIndexedLogsCreatedAfter(common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx)) _ = orm.InsertLogs([]Log{}, pg.WithParentCtx(ctx)) - _ = orm.InsertBlock(common.Hash{}, 1, time.Now(), 0, pg.WithParentCtx(ctx)) + _ = orm.InsertLogsWithBlock([]Log{}, NewLogPollerBlock(common.Hash{}, 1, time.Now(), 0), pg.WithParentCtx(ctx)) require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration)) require.Equal(t, 10, testutil.CollectAndCount(orm.datasetSize)) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index e044be2e0c9..a1b86d2cb2c 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -20,17 +20,15 @@ import ( // it exposes some of the database implementation details (e.g. pg.Q). Ideally it should be agnostic and could be applied to any persistence layer. // What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM. type ORM interface { - Q() pg.Q InsertLogs(logs []Log, qopts ...pg.QOpt) error - InsertBlock(blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, lastFinalizedBlockNumber int64, qopts ...pg.QOpt) error + InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error InsertFilter(filter Filter, qopts ...pg.QOpt) error LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) DeleteFilter(name string, qopts ...pg.QOpt) error - DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error - DeleteLogsAfter(start int64, qopts ...pg.QOpt) error + DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error DeleteExpiredLogs(qopts ...pg.QOpt) error GetBlocksRange(start int64, end int64, qopts ...pg.QOpt) ([]LogPollerBlock, error) @@ -58,6 +56,7 @@ type ORM interface { type DbORM struct { chainID *big.Int q pg.Q + lggr logger.Logger } // NewORM creates a DbORM scoped to chainID. @@ -67,13 +66,10 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) * return &DbORM{ chainID: chainID, q: q, + lggr: lggr, } } -func (o *DbORM) Q() pg.Q { - return o.q -} - // InsertBlock is idempotent to support replays. func (o *DbORM) InsertBlock(blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64, qopts ...pg.QOpt) error { args, err := newQueryArgs(o.chainID). @@ -191,12 +187,6 @@ func (o *DbORM) SelectLatestLogByEventSigWithConfs(eventSig common.Hash, address return &l, nil } -// DeleteBlocksAfter delete all blocks after and including start. -func (o *DbORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error { - q := o.q.WithOpts(qopts...) - return q.ExecQ(`DELETE FROM evm.log_poller_blocks WHERE block_number >= $1 AND evm_chain_id = $2`, start, utils.NewBig(o.chainID)) -} - // DeleteBlocksBefore delete all blocks before and including end. func (o *DbORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) @@ -204,9 +194,31 @@ func (o *DbORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { return err } -func (o *DbORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error { - q := o.q.WithOpts(qopts...) - return q.ExecQ(`DELETE FROM evm.logs WHERE block_number >= $1 AND evm_chain_id = $2`, start, utils.NewBig(o.chainID)) +func (o *DbORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error { + // These deletes are bounded by reorg depth, so they are + // fast and should not slow down the log readers. + return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error { + args, err := newQueryArgs(o.chainID). + withStartBlock(start). + toArgs() + if err != nil { + o.lggr.Error("Cant build args for DeleteLogsAndBlocksAfter queries", "err", err) + return err + } + + _, err = tx.NamedExec(`DELETE FROM evm.log_poller_blocks WHERE block_number >= :start_block AND evm_chain_id = :evm_chain_id`, args) + if err != nil { + o.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err) + return err + } + + _, err = tx.NamedExec(`DELETE FROM evm.logs WHERE block_number >= :start_block AND evm_chain_id = :evm_chain_id`, args) + if err != nil { + o.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err) + return err + } + return nil + }) } type Exp struct { @@ -233,13 +245,35 @@ func (o *DbORM) DeleteExpiredLogs(qopts ...pg.QOpt) error { // InsertLogs is idempotent to support replays. func (o *DbORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { - for _, log := range logs { - if o.chainID.Cmp(log.EvmChainId.ToInt()) != 0 { - return errors.Errorf("invalid chainID in log got %v want %v", log.EvmChainId.ToInt(), o.chainID) - } + if err := o.validateLogs(logs); err != nil { + return err } - q := o.q.WithOpts(qopts...) + return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error { + return o.insertLogsWithinTx(logs, tx) + }) +} + +func (o *DbORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error { + // Optimization, don't open TX when there is only a block to be persisted + if len(logs) == 0 { + return o.InsertBlock(block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, qopts...) + } + + if err := o.validateLogs(logs); err != nil { + return err + } + + // Block and logs goes with the same TX to ensure atomicity + return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error { + if err := o.InsertBlock(block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, pg.WithQueryer(tx)); err != nil { + return err + } + return o.insertLogsWithinTx(logs, tx) + }) +} + +func (o *DbORM) insertLogsWithinTx(logs []Log, tx pg.Queryer) error { batchInsertSize := 4000 for i := 0; i < len(logs); i += batchInsertSize { start, end := i, i+batchInsertSize @@ -247,12 +281,14 @@ func (o *DbORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { end = len(logs) } - err := q.ExecQNamed(` - INSERT INTO evm.logs - (evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at) + _, err := tx.NamedExec(` + INSERT INTO evm.logs + (evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at) VALUES - (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) - ON CONFLICT DO NOTHING`, logs[start:end]) + (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) + ON CONFLICT DO NOTHING`, + logs[start:end], + ) if err != nil { if errors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 { @@ -267,6 +303,15 @@ func (o *DbORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { return nil } +func (o *DbORM) validateLogs(logs []Log) error { + for _, log := range logs { + if o.chainID.Cmp(log.EvmChainId.ToInt()) != 0 { + return errors.Errorf("invalid chainID in log got %v want %v", log.EvmChainId.ToInt(), o.chainID) + } + } + return nil +} + func (o *DbORM) SelectLogsByBlockRange(start, end int64) ([]Log, error) { args, err := newQueryArgs(o.chainID). withStartBlock(start). diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 66e1afdc939..887984055ef 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -4,6 +4,7 @@ import ( "bytes" "database/sql" "fmt" + "math" "math/big" "testing" "time" @@ -15,7 +16,10 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -179,13 +183,13 @@ func TestORM(t *testing.T) { assert.Equal(t, int64(12), latest.BlockNumber) // Delete a block (only 10 on chain). - require.NoError(t, o1.DeleteBlocksAfter(10)) + require.NoError(t, o1.DeleteLogsAndBlocksAfter(10)) _, err = o1.SelectBlockByHash(common.HexToHash("0x1234")) require.Error(t, err) assert.True(t, errors.Is(err, sql.ErrNoRows)) // Delete blocks from another chain. - require.NoError(t, o2.DeleteBlocksAfter(11)) + require.NoError(t, o2.DeleteLogsAndBlocksAfter(11)) _, err = o2.SelectBlockByHash(common.HexToHash("0x1234")) require.Error(t, err) assert.True(t, errors.Is(err, sql.ErrNoRows)) @@ -318,7 +322,6 @@ func TestORM(t *testing.T) { require.Error(t, err) assert.True(t, errors.Is(err, sql.ErrNoRows)) // With block 12, anything <=2 should work - require.NoError(t, o1.DeleteBlocksAfter(10)) require.NoError(t, o1.InsertBlock(common.HexToHash("0x1234"), 11, time.Now(), 0)) require.NoError(t, o1.InsertBlock(common.HexToHash("0x1235"), 12, time.Now(), 0)) _, err = o1.SelectLatestLogByEventSigWithConfs(topic, common.HexToAddress("0x1234"), 0) @@ -421,7 +424,7 @@ func TestORM(t *testing.T) { assert.Len(t, logs, 7) // Delete logs after should delete all logs. - err = o1.DeleteLogsAfter(1) + err = o1.DeleteLogsAndBlocksAfter(1) require.NoError(t, err) logs, err = o1.SelectLogsByBlockRange(1, latest.BlockNumber) require.NoError(t, err) @@ -1301,3 +1304,133 @@ func TestNestedLogPollerBlocksQuery(t *testing.T) { require.NoError(t, err) require.Len(t, logs, 0) } + +func TestInsertLogsWithBlock(t *testing.T) { + chainID := testutils.NewRandomEVMChainID() + event := utils.RandomBytes32() + address := utils.RandomAddress() + + // We need full db here, because we want to test transaction rollbacks. + // Using pgtest.NewSqlxDB(t) will run all tests in TXs which is not desired for this type of test + // (inner tx rollback will rollback outer tx, blocking rest of execution) + _, db := heavyweight.FullTestDBV2(t, nil) + o := logpoller.NewORM(chainID, db, logger.TestLogger(t), pgtest.NewQConfig(true)) + + correctLog := GenLog(chainID, 1, 1, utils.RandomAddress().String(), event[:], address) + invalidLog := GenLog(chainID, -10, -10, utils.RandomAddress().String(), event[:], address) + correctBlock := logpoller.NewLogPollerBlock(utils.RandomBytes32(), 20, time.Now(), 10) + invalidBlock := logpoller.NewLogPollerBlock(utils.RandomBytes32(), -10, time.Now(), -10) + + tests := []struct { + name string + logs []logpoller.Log + block logpoller.LogPollerBlock + shouldRollback bool + }{ + { + name: "properly persist all data", + logs: []logpoller.Log{correctLog}, + block: correctBlock, + shouldRollback: false, + }, + { + name: "rollbacks transaction when block is invalid", + logs: []logpoller.Log{correctLog}, + block: invalidBlock, + shouldRollback: true, + }, + { + name: "rollbacks transaction when log is invalid", + logs: []logpoller.Log{invalidLog}, + block: correctBlock, + shouldRollback: true, + }, + { + name: "rollback when only some logs are invalid", + logs: []logpoller.Log{correctLog, invalidLog}, + block: correctBlock, + shouldRollback: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // clean all logs and blocks between test cases + defer func() { _ = o.DeleteLogsAndBlocksAfter(0) }() + insertError := o.InsertLogsWithBlock(tt.logs, tt.block) + + logs, logsErr := o.SelectLogs(0, math.MaxInt, address, event) + block, blockErr := o.SelectLatestBlock() + + if tt.shouldRollback { + assert.Error(t, insertError) + + assert.NoError(t, logsErr) + assert.Len(t, logs, 0) + + assert.Error(t, blockErr) + } else { + assert.NoError(t, insertError) + + assert.NoError(t, logsErr) + assert.Len(t, logs, len(tt.logs)) + + assert.NoError(t, blockErr) + assert.Equal(t, block.BlockNumber, tt.block.BlockNumber) + } + }) + } +} + +func TestInsertLogsInTx(t *testing.T) { + chainID := testutils.NewRandomEVMChainID() + event := utils.RandomBytes32() + address := utils.RandomAddress() + maxLogsSize := 9000 + + // We need full db here, because we want to test transaction rollbacks. + _, db := heavyweight.FullTestDBV2(t, nil) + o := logpoller.NewORM(chainID, db, logger.TestLogger(t), pgtest.NewQConfig(true)) + + logs := make([]logpoller.Log, maxLogsSize, maxLogsSize+1) + for i := 0; i < maxLogsSize; i++ { + logs[i] = GenLog(chainID, int64(i+1), int64(i+1), utils.RandomAddress().String(), event[:], address) + } + invalidLog := GenLog(chainID, -10, -10, utils.RandomAddress().String(), event[:], address) + + tests := []struct { + name string + logs []logpoller.Log + shouldRollback bool + }{ + { + name: "all logs persisted", + logs: logs, + shouldRollback: false, + }, + { + name: "rollback when invalid log is passed", + logs: append(logs, invalidLog), + shouldRollback: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // clean all logs and blocks between test cases + defer func() { _ = o.DeleteLogsAndBlocksAfter(0) }() + + insertErr := o.InsertLogs(tt.logs) + logsFromDb, err := o.SelectLogs(0, math.MaxInt, address, event) + assert.NoError(t, err) + + if tt.shouldRollback { + assert.Error(t, insertErr) + assert.Len(t, logsFromDb, 0) + } else { + assert.NoError(t, insertErr) + assert.Len(t, logsFromDb, len(tt.logs)) + } + }) + } +}