From 1d9dd466e2933b7558949554b882f29f63d90b9f Mon Sep 17 00:00:00 2001 From: Domino Valdano Date: Thu, 25 Apr 2024 18:39:18 -0700 Subject: [PATCH] Make LogPoller more robust against local finality violations (#12605) * Reduce unnecessary code duplication getCurrentBlockMaybeHandleReorg is called just before the for loop over unfinalized blocks begins, and at the end of each iteration. Simplifying by moving them both to the beginning of the for loop * Fix bugs in TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld This fixes 2 bugs on develop branch in this test, and removes some unused commented code. First Bug ========= The first bug was causing a false positive PASS on develop branch, which was obscuring a (very minor) bug in BackupPoller that's been fixed in this PR. The comment here about what the test was intended to test is still correct: // Only the 2nd batch + 1 log from a previous batch should be backfilled, because we perform backfill starting // from one block behind the latest finalized block Contrary to the comment, the code was returning 2 logs from the 1st batch (Data=9 & Data=10), plus 9 of 10 logs from the 2nd batch. This was incorrect behavior, but the test was also checking for the same incorrect behavior (looking for 11 logs with first one being Data=9) instead of what's described in the comment. The bug in the production code was that it starts the Backup Poller at Finalized - 1 instead of Finalized. This is a harmless "bug", just unnecessarily starting a block too early, since there's no reason for backup logpoller to re-request the same finalized logs that's already been processed. Now, the code returns the last log from the 1st batch + all but one logs from the 2nd batch, which is correct. (It can't return the last log because that goes beyond the last safe block.) So the test checks that there are 10 logs with first one being Data=10 (last log from the first batch.) Second Bug ========== The second bug was passing firstBatchBlock and secondBatchBlock directly to markBlockAsFinalized() instead of passing firstBatchBlock - 1 and secondBatchBlock - 1. This was only working because of a bug in the version of geth we're currently using: when you request the pending block from simulated geth, it gives you back the current block (1 block prior) instead of the current block. (For example, in the first case, even though we wanted block 11, the latest current block, we request block 12 and get back block 11.) This has been fixed in the latest version of geth... so presumably if we don't fix this here the test would have started failing as soon as we upgrade to the latest version of geth. It doesn't change any behavior of the test for the present version of geth, just makes it more clear that we want block 11 not 12. * Check that all results from batchFetchBlocks() are finalized aside from "latest" batchFetchBlocks() will now fetch the "finalized" block along with the rest of each batch, and validate that all of the block numbers (aside from the special when "lateest" is requested) are <= the finalized block number returned. Also, change backfill() to always save the last block of each batch of logs requested, rather than the last block of the logs returned. This only makes a difference if the last block requested has no logs matching the filter, but this change is essential for being able to safely change lastSafeBlockNumber from latestFinalizedBlock - 1 to latestFinalizedBlock * Update logpoller tests * fix merge conflict * reduce cognitive complexity * Add validationReqType type definition * Fix comments * Add Test_FetchBlocks --- .changeset/fresh-moles-explode.md | 5 + core/chains/evm/logpoller/log_poller.go | 189 +++++++++++++----- .../evm/logpoller/log_poller_internal_test.go | 136 ++++++++++++- core/chains/evm/logpoller/log_poller_test.go | 49 ++--- 4 files changed, 290 insertions(+), 89 deletions(-) create mode 100644 .changeset/fresh-moles-explode.md diff --git a/.changeset/fresh-moles-explode.md b/.changeset/fresh-moles-explode.md new file mode 100644 index 00000000000..205002b40a0 --- /dev/null +++ b/.changeset/fresh-moles-explode.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +core/chains/evm/logpoller: Stricter finality checks in LogPoller, to be more robust during rpc failover events #updated diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index de2a182bbce..7592ec104c4 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -699,8 +699,8 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) { } return } - // If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay) - backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-lp.backupPollerBlockDelay) + // If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-backupPollerBlockDelay) + backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-lp.backupPollerBlockDelay) // (or at block 0 if whole blockchain is too short) lp.backupPollerNextBlock = mathutil.Max(backupStartBlock, 0) } @@ -771,11 +771,16 @@ func convertTopics(topics []common.Hash) [][]byte { return topicsForDB } -func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log) (blocks []LogPollerBlock, err error) { +// blocksFromLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber, +// whether or not there are any logs in the list from that block +func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) { var numbers []uint64 for _, log := range logs { numbers = append(numbers, log.BlockNumber) } + if numbers[len(numbers)-1] != endBlockNumber { + numbers = append(numbers, endBlockNumber) + } return lp.GetBlocksRange(ctx, numbers) } @@ -789,6 +794,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { batchSize := lp.backfillBatchSize for from := start; from <= end; from += batchSize { to := mathutil.Min(from+batchSize-1, end) + gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil)) if err != nil { var rpcErr client.JsonError @@ -810,13 +816,19 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { if len(gethLogs) == 0 { continue } - blocks, err := lp.blocksFromLogs(ctx, gethLogs) + blocks, err := lp.blocksFromLogs(ctx, gethLogs, uint64(to)) if err != nil { return err } + endblock := blocks[len(blocks)-1] + if gethLogs[len(gethLogs)-1].BlockNumber != uint64(to) { + // Pop endblock if there were no logs for it, so that length of blocks & gethLogs are the same to pass to convertLogs + blocks = blocks[:len(blocks)-1] + } + lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks) - err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), blocks[len(blocks)-1]) + err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock) if err != nil { lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to) return err @@ -955,19 +967,18 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int currentBlockNumber = lastSafeBackfillBlock + 1 } - if currentBlockNumber > currentBlock.Number { - // If we successfully backfilled we have logs up to and including lastSafeBackfillBlock, - // now load the first unfinalized block. - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) - if err != nil { - // If there's an error handling the reorg, we can't be sure what state the db was left in. - // Resume from the latest block saved. - lp.lggr.Errorw("Unable to get current block", "err", err) - return + for { + if currentBlockNumber > currentBlock.Number { + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) + if err != nil { + // If there's an error handling the reorg, we can't be sure what state the db was left in. + // Resume from the latest block saved. + lp.lggr.Errorw("Unable to get current block", "err", err) + return + } + currentBlockNumber = currentBlock.Number } - } - for { h := currentBlock.Hash var logs []types.Log logs, err = lp.ec.FilterLogs(ctx, lp.Filter(nil, nil, &h)) @@ -992,14 +1003,6 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int if currentBlockNumber > latestBlockNumber { break } - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) - if err != nil { - // If there's an error handling the reorg, we can't be sure what state the db was left in. - // Resume from the latest block saved. - lp.lggr.Errorw("Unable to get current block", "err", err) - return - } - currentBlockNumber = currentBlock.Number } } @@ -1252,12 +1255,16 @@ func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]Lo return blocks, nil } +// fillRemainingBlocksFromRPC sends a batch request for each block in blocksRequested, and converts them from +// geth blocks into LogPollerBlock structs. This is only intended to be used for requesting finalized blocks, +// if any of the blocks coming back are not finalized, an error will be returned func (lp *logPoller) fillRemainingBlocksFromRPC( ctx context.Context, blocksRequested map[uint64]struct{}, blocksFound map[uint64]LogPollerBlock, ) (map[uint64]LogPollerBlock, error) { var remainingBlocks []string + for num := range blocksRequested { if _, ok := blocksFound[num]; !ok { remainingBlocks = append(remainingBlocks, hexutil.EncodeBig(new(big.Int).SetUint64(num))) @@ -1287,54 +1294,126 @@ func (lp *logPoller) fillRemainingBlocksFromRPC( return logPollerBlocks, nil } -func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) { - reqs := make([]rpc.BatchElem, 0, len(blocksRequested)) - for _, num := range blocksRequested { - req := rpc.BatchElem{ - Method: "eth_getBlockByNumber", - Args: []interface{}{num, false}, - Result: &evmtypes.Head{}, - } - reqs = append(reqs, req) +// newBlockReq constructs an eth_getBlockByNumber request for particular block number +func newBlockReq(num string) rpc.BatchElem { + return rpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{num, false}, + Result: &evmtypes.Head{}, } +} - for i := 0; i < len(reqs); i += int(batchSize) { - j := i + int(batchSize) - if j > len(reqs) { - j = len(reqs) - } +type blockValidationType string - err := lp.ec.BatchCallContext(ctx, reqs[i:j]) - if err != nil { - return nil, err +var ( + latestBlock blockValidationType = blockValidationType(rpc.LatestBlockNumber.String()) + finalizedBlock blockValidationType = blockValidationType(rpc.FinalizedBlockNumber.String()) +) + +// fetchBlocks fetches a list of blocks in a single batch. validationReq is the string to use for the +// additional validation request (either the "finalized" or "latest" string defined in rpc module), which +// will be used to validate the finality of the other blocks. +func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string, validationReq blockValidationType) (blocks []*evmtypes.Head, err error) { + n := len(blocksRequested) + blocks = make([]*evmtypes.Head, 0, n+1) + reqs := make([]rpc.BatchElem, 0, n+1) + + validationBlockIndex := n + for k, num := range blocksRequested { + if num == string(validationReq) { + validationBlockIndex = k } + reqs = append(reqs, newBlockReq(num)) + } + + if validationBlockIndex == n { + // Add validation req if it wasn't in there already + reqs = append(reqs, newBlockReq(string(validationReq))) + } + + err = lp.ec.BatchCallContext(ctx, reqs) + if err != nil { + return nil, err } - var blocks = make([]*evmtypes.Head, 0, len(reqs)) - for _, r := range reqs { - if r.Error != nil { - return nil, r.Error + validationBlock, err := validateBlockResponse(reqs[validationBlockIndex]) + if err != nil { + return nil, err + } + latestFinalizedBlockNumber := validationBlock.Number + if validationReq == latestBlock { + // subtract finalityDepth from "latest" to get finalized, when useFinalityTags = false + latestFinalizedBlockNumber = mathutil.Max(latestFinalizedBlockNumber-lp.finalityDepth, 0) + } + if len(reqs) == n+1 { + reqs = reqs[:n] // ignore last req if we added it explicitly for validation + } + + for k, r := range reqs { + if k == validationBlockIndex { + // Already validated this one, just insert it in proper place + blocks = append(blocks, validationBlock) + continue } - block, is := r.Result.(*evmtypes.Head) - if !is { - return nil, pkgerrors.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result) + block, err2 := validateBlockResponse(r) + if err2 != nil { + return nil, err2 } - if block == nil { - return nil, pkgerrors.New("invariant violation: got nil block") + + blockRequested := r.Args[0].(string) + if blockRequested != string(latestBlock) && block.Number > latestFinalizedBlockNumber { + return nil, fmt.Errorf( + "Received unfinalized block %d while expecting finalized block (latestFinalizedBlockNumber = %d)", + block.Number, latestFinalizedBlockNumber) } - if block.Hash == (common.Hash{}) { - return nil, pkgerrors.Errorf("missing block hash for block number: %d", block.Number) + + blocks = append(blocks, block) + } + return blocks, nil +} + +func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) { + var blocks = make([]*evmtypes.Head, 0, len(blocksRequested)+1) + + validationReq := finalizedBlock + if !lp.useFinalityTag { + validationReq = latestBlock + } + + for i := 0; i < len(blocksRequested); i += int(batchSize) { + j := i + int(batchSize) + if j > len(blocksRequested) { + j = len(blocksRequested) } - if block.Number < 0 { - return nil, pkgerrors.Errorf("expected block number to be >= to 0, got %d", block.Number) + moreBlocks, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq) + if err != nil { + return nil, err } - blocks = append(blocks, block) + blocks = append(blocks, moreBlocks...) } return blocks, nil } +func validateBlockResponse(r rpc.BatchElem) (*evmtypes.Head, error) { + block, is := r.Result.(*evmtypes.Head) + + if !is { + return nil, pkgerrors.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result) + } + if block == nil { + return nil, pkgerrors.New("invariant violation: got nil block") + } + if block.Hash == (common.Hash{}) { + return nil, pkgerrors.Errorf("missing block hash for block number: %d", block.Number) + } + if block.Number < 0 { + return nil, pkgerrors.Errorf("expected block number to be >= to 0, got %d", block.Number) + } + return block, nil +} + // IndexedLogsWithSigsExcluding returns the set difference(A-B) of logs with signature sigA and sigB, matching is done on the topics index // // For example, query to retrieve unfulfilled requests by querying request log events without matching fulfillment log events. diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index b6af0f7de5c..4236f0b8ef1 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -2,6 +2,7 @@ package logpoller import ( "context" + "errors" "fmt" "math/big" "reflect" @@ -12,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" pkgerrors "github.com/pkg/errors" @@ -205,13 +207,14 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr) + latestBlock := int64(4) - head := evmtypes.Head{Number: 3} + head := evmtypes.Head{Number: latestBlock} events := []common.Hash{EmitterABI.Events["Log1"].ID} log1 := types.Log{ Index: 0, BlockHash: common.Hash{}, - BlockNumber: uint64(3), + BlockNumber: uint64(latestBlock), Topics: events, Address: addr, TxHash: common.HexToHash("0x1234"), @@ -230,20 +233,44 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, } lp := NewLogPoller(orm, ec, lggr, lpOpts) lp.BackupPollAndSaveLogs(ctx) assert.Equal(t, int64(0), lp.backupPollerNextBlock) assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len()) - lp.PollAndSaveLogs(ctx, 3) + lp.PollAndSaveLogs(ctx, latestBlock) lastProcessed, err := lp.orm.SelectLatestBlock(ctx) require.NoError(t, err) - require.Equal(t, int64(3), lastProcessed.BlockNumber) + require.Equal(t, latestBlock, lastProcessed.BlockNumber) lp.BackupPollAndSaveLogs(ctx) - assert.Equal(t, int64(1), lp.backupPollerNextBlock) // Ensure non-negative! + assert.Equal(t, int64(2), lp.backupPollerNextBlock) +} + +func mockBatchCallContext(t *testing.T, ec *evmclimocks.Client) { + ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + elems := args.Get(1).([]rpc.BatchElem) + for _, e := range elems { + var num int64 + block := e.Args[0].(string) + switch block { + case "latest": + num = 8 + case "finalized": + num = 5 + default: + n, err := hexutil.DecodeUint64(block) + require.NoError(t, err) + num = int64(n) + } + result := e.Result.(*evmtypes.Head) + *result = evmtypes.Head{Number: num, Hash: utils.NewHash()} + + } + }) } func TestLogPoller_Replay(t *testing.T) { @@ -269,16 +296,20 @@ func TestLogPoller_Replay(t *testing.T) { } ec := evmclimocks.NewClient(t) - ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Twice() + ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(context.Context, *big.Int) (*evmtypes.Head, error) { + headCopy := head + return &headCopy, nil + }) + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once() ec.On("ConfiguredChainID").Return(chainID, nil) + lpOpts := Opts{ - PollPeriod: time.Hour, + PollPeriod: time.Second, FinalityDepth: 3, BackfillBatchSize: 3, RpcBatchSize: 3, KeepFinalizedBlocksDepth: 20, - BackupPollerBlockDelay: 100, + BackupPollerBlockDelay: 0, } lp := NewLogPoller(orm, ec, lggr, lpOpts) @@ -308,6 +339,8 @@ func TestLogPoller_Replay(t *testing.T) { // Replay() should return error code received from replayComplete t.Run("returns error code on replay complete", func(t *testing.T) { + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once() + mockBatchCallContext(t, ec) anyErr := pkgerrors.New("any error") done := make(chan struct{}) go func() { @@ -345,6 +378,7 @@ func TestLogPoller_Replay(t *testing.T) { var wg sync.WaitGroup defer func() { wg.Wait() }() ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { + head = evmtypes.Head{Number: 4} wg.Add(1) go func() { defer wg.Done() @@ -371,6 +405,7 @@ func TestLogPoller_Replay(t *testing.T) { ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms + head = evmtypes.Head{Number: 5} t.Cleanup(lp.reset) servicetest.Run(t, lp) @@ -395,6 +430,8 @@ func TestLogPoller_Replay(t *testing.T) { ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { go func() { defer close(done) + + head = evmtypes.Head{Number: 4} // Restore latest block to 4, so this matches the fromBlock requested select { case lp.replayStart <- 4: case <-ctx.Done(): @@ -405,9 +442,10 @@ func TestLogPoller_Replay(t *testing.T) { lp.cancel() close(pass) }) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil) t.Cleanup(lp.reset) + head = evmtypes.Head{Number: 5} // Latest block must be > lastProcessed in order for SaveAndPollLogs() to call FilterLogs() servicetest.Run(t, lp) select { @@ -420,6 +458,9 @@ func TestLogPoller_Replay(t *testing.T) { // ReplayAsync should return as soon as replayStart is received t.Run("ReplayAsync success", func(t *testing.T) { t.Cleanup(lp.reset) + head = evmtypes.Head{Number: 5} + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil) + mockBatchCallContext(t, ec) servicetest.Run(t, lp) lp.ReplayAsync(1) @@ -430,6 +471,7 @@ func TestLogPoller_Replay(t *testing.T) { t.Run("ReplayAsync error", func(t *testing.T) { t.Cleanup(lp.reset) servicetest.Run(t, lp) + head = evmtypes.Head{Number: 4} anyErr := pkgerrors.New("async error") observedLogs.TakeAll() @@ -462,6 +504,9 @@ func TestLogPoller_Replay(t *testing.T) { err = lp.orm.InsertBlock(ctx, head.Hash, head.Number, head.Timestamp, head.Number) require.NoError(t, err) + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil) + mockBatchCallContext(t, ec) + err = lp.Replay(ctx, 1) require.NoError(t, err) }) @@ -554,6 +599,77 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { }) } +func Test_FetchBlocks(t *testing.T) { + lggr := logger.Test(t) + chainID := testutils.FixtureChainID + db := pgtest.NewSqlxDB(t) + orm := NewORM(chainID, db, lggr) + ctx := testutils.Context(t) + + lpOpts := Opts{ + PollPeriod: time.Hour, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 50, + FinalityDepth: 3, + } + + ec := evmclimocks.NewClient(t) + mockBatchCallContext(t, ec) // This will return 5 for "finalized" and 8 for "latest" + + cases := []struct { + name string + blocksRequested []string + expectedErr error + }{{ + "successful validation including finalized and latest", + []string{"0x3", "latest", "0x5", "finalized", "0x1"}, + nil, + }, { + "successful validation with all block numbers", + []string{"0x2", "0x5", "0x3", "0x4"}, + nil, + }, { + "finality violation including finalized and latest", + []string{"0x8", "0x2", "latest", "finalized"}, + errors.New("Received unfinalized block 8 while expecting finalized block (latestFinalizedBlockNumber = 5)"), + }, { + "finality violation with all block numbers", + []string{"0x9", "0x2", "finalized", "latest"}, + errors.New("Received unfinalized block 9 while expecting finalized block (latestFinalizedBlockNumber = 5)"), + }} + + lp := NewLogPoller(orm, ec, lggr, lpOpts) + for _, tc := range cases { + for _, lp.useFinalityTag = range []bool{false, true} { + blockValidationReq := latestBlock + if lp.useFinalityTag { + blockValidationReq = finalizedBlock + } + t.Run(fmt.Sprintf("%s where useFinalityTag=%t", tc.name, lp.useFinalityTag), func(t *testing.T) { + blocks, err := lp.fetchBlocks(ctx, tc.blocksRequested, blockValidationReq) + if tc.expectedErr != nil { + require.Equal(t, err.Error(), tc.expectedErr.Error()) + return // PASS + } + require.NoError(t, err) + for i, blockRequested := range tc.blocksRequested { + switch blockRequested { + case string(latestBlock): + assert.Equal(t, int64(8), blocks[i].Number) + case string(finalizedBlock): + assert.Equal(t, int64(5), blocks[i].Number) + default: + blockNum, err2 := hexutil.DecodeUint64(blockRequested) + require.NoError(t, err2) + assert.Equal(t, int64(blockNum), blocks[i].Number) + } + } + }) + } + } +} + func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { lggr := logger.Test(b) lpOpts := Opts{ diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 2096ccf3cf4..74ec41fa85a 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -545,8 +545,6 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) BackupPollerBlockDelay: 1, } th := SetupTH(t, lpOpts) - //header, err := th.Client.HeaderByNumber(ctx, nil) - //require.NoError(t, err) // Emit some logs in blocks for i := 1; i <= logsBatch; i++ { @@ -559,7 +557,7 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) // 1 -> 2 -> ... -> firstBatchBlock firstBatchBlock := th.PollAndSaveLogs(ctx, 1) // Mark current tip of the chain as finalized (after emitting 10 logs) - markBlockAsFinalized(t, th, firstBatchBlock) + markBlockAsFinalized(t, th, firstBatchBlock-1) // Emit 2nd batch of block for i := 1; i <= logsBatch; i++ { @@ -571,7 +569,7 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) // 1 -> 2 -> ... -> firstBatchBlock (finalized) -> .. -> firstBatchBlock + emitted logs secondBatchBlock := th.PollAndSaveLogs(ctx, firstBatchBlock) // Mark current tip of the block as finalized (after emitting 20 logs) - markBlockAsFinalized(t, th, secondBatchBlock) + markBlockAsFinalized(t, th, secondBatchBlock-1) // Register filter err := th.LogPoller.RegisterFilter(ctx, logpoller.Filter{ @@ -595,8 +593,8 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) th.EmitterAddress1, ) require.NoError(t, err) - require.Len(t, logs, logsBatch+1) - require.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000009`), logs[0].Data) + require.Len(t, logs, logsBatch) + require.Equal(t, hexutil.MustDecode(`0x000000000000000000000000000000000000000000000000000000000000000a`), logs[0].Data) } func TestLogPoller_BlockTimestamps(t *testing.T) { @@ -676,6 +674,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) { require.NoError(t, err) // Logs should have correct timestamps + require.NotZero(t, len(lg1)) b, _ := th.Client.BlockByHash(ctx, lg1[0].BlockHash) t.Log(len(lg1), lg1[0].BlockTimestamp) assert.Equal(t, int64(b.Time()), lg1[0].BlockTimestamp.UTC().Unix(), time1) @@ -1181,6 +1180,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { // Check that L1_1 has a proper data payload lgs, err := th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2) require.NoError(t, err) + require.NotZero(t, len(lgs)) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000001`), lgs[0].Data) // Single block reorg and log poller not working for a while, mine blocks and progress with finalization @@ -1208,6 +1208,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { // Expect L1_2 to be properly updated lgs, err = th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2) require.NoError(t, err) + require.NotZero(t, len(lgs)) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000002`), lgs[0].Data) th.assertHaveCanonical(t, 1, 1) th.assertDontHave(t, 2, 3) // These blocks are backfilled @@ -1301,6 +1302,7 @@ func TestLogPoller_LoadFilters(t *testing.T) { func TestLogPoller_GetBlocks_Range(t *testing.T) { t.Parallel() + lpOpts := logpoller.Opts{ UseFinalityTag: false, FinalityDepth: 2, @@ -1310,7 +1312,15 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { } th := SetupTH(t, lpOpts) - err := th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{ + _, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)}) + require.NoError(t, err) + th.Client.Commit() // Commit block #2 with log in it + + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(2)}) + require.NoError(t, err) + th.Client.Commit() // Commit block #3 with a different log + + err = th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{ Name: "GetBlocks Test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}, @@ -1330,16 +1340,13 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 1, len(blocks)) assert.Equal(t, 1, int(blocks[0].BlockNumber)) - // LP fails to retrieve block 2 because it's neither in DB nor returned by RPC + // LP fails to return block 2 because it hasn't been finalized yet blockNums = []uint64{2} _, err = th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums) require.Error(t, err) - assert.Equal(t, "blocks were not found in db or RPC call: [2]", err.Error()) + assert.Equal(t, "Received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error()) - // Emit a log and mine block #2 - _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)}) - require.NoError(t, err) - th.Client.Commit() + th.Client.Commit() // Commit block #4, so that block #2 is finalized // Assert block 2 is not yet in DB _, err = th.ORM.SelectBlockByNumber(testutils.Context(t), 2) @@ -1351,10 +1358,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 1, len(rpcBlocks)) assert.Equal(t, 2, int(rpcBlocks[0].BlockNumber)) - // Emit a log and mine block #3 - _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(2)}) - require.NoError(t, err) - th.Client.Commit() + th.Client.Commit() // commit block #5 so that #3 becomes finalized // Assert block 3 is not yet in DB _, err = th.ORM.SelectBlockByNumber(testutils.Context(t), 3) @@ -1368,12 +1372,9 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 1, int(rpcBlocks2[0].BlockNumber)) assert.Equal(t, 3, int(rpcBlocks2[1].BlockNumber)) - // after calling PollAndSaveLogs, block 2 & 3 are persisted in DB + // after calling PollAndSaveLogs, block 3 (latest finalized block) is persisted in DB th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1) - block, err := th.ORM.SelectBlockByNumber(testutils.Context(t), 2) - require.NoError(t, err) - assert.Equal(t, 2, int(block.BlockNumber)) - block, err = th.ORM.SelectBlockByNumber(testutils.Context(t), 3) + block, err := th.ORM.SelectBlockByNumber(testutils.Context(t), 3) require.NoError(t, err) assert.Equal(t, 3, int(block.BlockNumber)) @@ -1507,9 +1508,9 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { time.Sleep(100 * time.Millisecond) require.NoError(t, lp.Start(ctx)) - require.Eventually(t, func() bool { + testutils.AssertEventually(t, func() bool { return observedLogs.Len() >= 1 - }, 2*time.Second, 20*time.Millisecond) + }) err = lp.Close() require.NoError(t, err)