diff --git a/.changeset/fresh-moles-explode.md b/.changeset/fresh-moles-explode.md new file mode 100644 index 00000000000..205002b40a0 --- /dev/null +++ b/.changeset/fresh-moles-explode.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +core/chains/evm/logpoller: Stricter finality checks in LogPoller, to be more robust during rpc failover events #updated diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index de2a182bbce..7592ec104c4 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -699,8 +699,8 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) { } return } - // If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay) - backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-lp.backupPollerBlockDelay) + // If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-backupPollerBlockDelay) + backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-lp.backupPollerBlockDelay) // (or at block 0 if whole blockchain is too short) lp.backupPollerNextBlock = mathutil.Max(backupStartBlock, 0) } @@ -771,11 +771,16 @@ func convertTopics(topics []common.Hash) [][]byte { return topicsForDB } -func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log) (blocks []LogPollerBlock, err error) { +// blocksFromLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber, +// whether or not there are any logs in the list from that block +func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) { var numbers []uint64 for _, log := range logs { numbers = append(numbers, log.BlockNumber) } + if numbers[len(numbers)-1] != endBlockNumber { + numbers = append(numbers, endBlockNumber) + } return lp.GetBlocksRange(ctx, numbers) } @@ -789,6 +794,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { batchSize := lp.backfillBatchSize for from := start; from <= end; from += batchSize { to := mathutil.Min(from+batchSize-1, end) + gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil)) if err != nil { var rpcErr client.JsonError @@ -810,13 +816,19 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { if len(gethLogs) == 0 { continue } - blocks, err := lp.blocksFromLogs(ctx, gethLogs) + blocks, err := lp.blocksFromLogs(ctx, gethLogs, uint64(to)) if err != nil { return err } + endblock := blocks[len(blocks)-1] + if gethLogs[len(gethLogs)-1].BlockNumber != uint64(to) { + // Pop endblock if there were no logs for it, so that length of blocks & gethLogs are the same to pass to convertLogs + blocks = blocks[:len(blocks)-1] + } + lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks) - err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), blocks[len(blocks)-1]) + err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock) if err != nil { lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to) return err @@ -955,19 +967,18 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int currentBlockNumber = lastSafeBackfillBlock + 1 } - if currentBlockNumber > currentBlock.Number { - // If we successfully backfilled we have logs up to and including lastSafeBackfillBlock, - // now load the first unfinalized block. - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) - if err != nil { - // If there's an error handling the reorg, we can't be sure what state the db was left in. - // Resume from the latest block saved. - lp.lggr.Errorw("Unable to get current block", "err", err) - return + for { + if currentBlockNumber > currentBlock.Number { + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) + if err != nil { + // If there's an error handling the reorg, we can't be sure what state the db was left in. + // Resume from the latest block saved. + lp.lggr.Errorw("Unable to get current block", "err", err) + return + } + currentBlockNumber = currentBlock.Number } - } - for { h := currentBlock.Hash var logs []types.Log logs, err = lp.ec.FilterLogs(ctx, lp.Filter(nil, nil, &h)) @@ -992,14 +1003,6 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int if currentBlockNumber > latestBlockNumber { break } - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) - if err != nil { - // If there's an error handling the reorg, we can't be sure what state the db was left in. - // Resume from the latest block saved. - lp.lggr.Errorw("Unable to get current block", "err", err) - return - } - currentBlockNumber = currentBlock.Number } } @@ -1252,12 +1255,16 @@ func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]Lo return blocks, nil } +// fillRemainingBlocksFromRPC sends a batch request for each block in blocksRequested, and converts them from +// geth blocks into LogPollerBlock structs. This is only intended to be used for requesting finalized blocks, +// if any of the blocks coming back are not finalized, an error will be returned func (lp *logPoller) fillRemainingBlocksFromRPC( ctx context.Context, blocksRequested map[uint64]struct{}, blocksFound map[uint64]LogPollerBlock, ) (map[uint64]LogPollerBlock, error) { var remainingBlocks []string + for num := range blocksRequested { if _, ok := blocksFound[num]; !ok { remainingBlocks = append(remainingBlocks, hexutil.EncodeBig(new(big.Int).SetUint64(num))) @@ -1287,54 +1294,126 @@ func (lp *logPoller) fillRemainingBlocksFromRPC( return logPollerBlocks, nil } -func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) { - reqs := make([]rpc.BatchElem, 0, len(blocksRequested)) - for _, num := range blocksRequested { - req := rpc.BatchElem{ - Method: "eth_getBlockByNumber", - Args: []interface{}{num, false}, - Result: &evmtypes.Head{}, - } - reqs = append(reqs, req) +// newBlockReq constructs an eth_getBlockByNumber request for particular block number +func newBlockReq(num string) rpc.BatchElem { + return rpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{num, false}, + Result: &evmtypes.Head{}, } +} - for i := 0; i < len(reqs); i += int(batchSize) { - j := i + int(batchSize) - if j > len(reqs) { - j = len(reqs) - } +type blockValidationType string - err := lp.ec.BatchCallContext(ctx, reqs[i:j]) - if err != nil { - return nil, err +var ( + latestBlock blockValidationType = blockValidationType(rpc.LatestBlockNumber.String()) + finalizedBlock blockValidationType = blockValidationType(rpc.FinalizedBlockNumber.String()) +) + +// fetchBlocks fetches a list of blocks in a single batch. validationReq is the string to use for the +// additional validation request (either the "finalized" or "latest" string defined in rpc module), which +// will be used to validate the finality of the other blocks. +func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string, validationReq blockValidationType) (blocks []*evmtypes.Head, err error) { + n := len(blocksRequested) + blocks = make([]*evmtypes.Head, 0, n+1) + reqs := make([]rpc.BatchElem, 0, n+1) + + validationBlockIndex := n + for k, num := range blocksRequested { + if num == string(validationReq) { + validationBlockIndex = k } + reqs = append(reqs, newBlockReq(num)) + } + + if validationBlockIndex == n { + // Add validation req if it wasn't in there already + reqs = append(reqs, newBlockReq(string(validationReq))) + } + + err = lp.ec.BatchCallContext(ctx, reqs) + if err != nil { + return nil, err } - var blocks = make([]*evmtypes.Head, 0, len(reqs)) - for _, r := range reqs { - if r.Error != nil { - return nil, r.Error + validationBlock, err := validateBlockResponse(reqs[validationBlockIndex]) + if err != nil { + return nil, err + } + latestFinalizedBlockNumber := validationBlock.Number + if validationReq == latestBlock { + // subtract finalityDepth from "latest" to get finalized, when useFinalityTags = false + latestFinalizedBlockNumber = mathutil.Max(latestFinalizedBlockNumber-lp.finalityDepth, 0) + } + if len(reqs) == n+1 { + reqs = reqs[:n] // ignore last req if we added it explicitly for validation + } + + for k, r := range reqs { + if k == validationBlockIndex { + // Already validated this one, just insert it in proper place + blocks = append(blocks, validationBlock) + continue } - block, is := r.Result.(*evmtypes.Head) - if !is { - return nil, pkgerrors.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result) + block, err2 := validateBlockResponse(r) + if err2 != nil { + return nil, err2 } - if block == nil { - return nil, pkgerrors.New("invariant violation: got nil block") + + blockRequested := r.Args[0].(string) + if blockRequested != string(latestBlock) && block.Number > latestFinalizedBlockNumber { + return nil, fmt.Errorf( + "Received unfinalized block %d while expecting finalized block (latestFinalizedBlockNumber = %d)", + block.Number, latestFinalizedBlockNumber) } - if block.Hash == (common.Hash{}) { - return nil, pkgerrors.Errorf("missing block hash for block number: %d", block.Number) + + blocks = append(blocks, block) + } + return blocks, nil +} + +func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) { + var blocks = make([]*evmtypes.Head, 0, len(blocksRequested)+1) + + validationReq := finalizedBlock + if !lp.useFinalityTag { + validationReq = latestBlock + } + + for i := 0; i < len(blocksRequested); i += int(batchSize) { + j := i + int(batchSize) + if j > len(blocksRequested) { + j = len(blocksRequested) } - if block.Number < 0 { - return nil, pkgerrors.Errorf("expected block number to be >= to 0, got %d", block.Number) + moreBlocks, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq) + if err != nil { + return nil, err } - blocks = append(blocks, block) + blocks = append(blocks, moreBlocks...) } return blocks, nil } +func validateBlockResponse(r rpc.BatchElem) (*evmtypes.Head, error) { + block, is := r.Result.(*evmtypes.Head) + + if !is { + return nil, pkgerrors.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result) + } + if block == nil { + return nil, pkgerrors.New("invariant violation: got nil block") + } + if block.Hash == (common.Hash{}) { + return nil, pkgerrors.Errorf("missing block hash for block number: %d", block.Number) + } + if block.Number < 0 { + return nil, pkgerrors.Errorf("expected block number to be >= to 0, got %d", block.Number) + } + return block, nil +} + // IndexedLogsWithSigsExcluding returns the set difference(A-B) of logs with signature sigA and sigB, matching is done on the topics index // // For example, query to retrieve unfulfilled requests by querying request log events without matching fulfillment log events. diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index b6af0f7de5c..4236f0b8ef1 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -2,6 +2,7 @@ package logpoller import ( "context" + "errors" "fmt" "math/big" "reflect" @@ -12,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" pkgerrors "github.com/pkg/errors" @@ -205,13 +207,14 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr) + latestBlock := int64(4) - head := evmtypes.Head{Number: 3} + head := evmtypes.Head{Number: latestBlock} events := []common.Hash{EmitterABI.Events["Log1"].ID} log1 := types.Log{ Index: 0, BlockHash: common.Hash{}, - BlockNumber: uint64(3), + BlockNumber: uint64(latestBlock), Topics: events, Address: addr, TxHash: common.HexToHash("0x1234"), @@ -230,20 +233,44 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, } lp := NewLogPoller(orm, ec, lggr, lpOpts) lp.BackupPollAndSaveLogs(ctx) assert.Equal(t, int64(0), lp.backupPollerNextBlock) assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len()) - lp.PollAndSaveLogs(ctx, 3) + lp.PollAndSaveLogs(ctx, latestBlock) lastProcessed, err := lp.orm.SelectLatestBlock(ctx) require.NoError(t, err) - require.Equal(t, int64(3), lastProcessed.BlockNumber) + require.Equal(t, latestBlock, lastProcessed.BlockNumber) lp.BackupPollAndSaveLogs(ctx) - assert.Equal(t, int64(1), lp.backupPollerNextBlock) // Ensure non-negative! + assert.Equal(t, int64(2), lp.backupPollerNextBlock) +} + +func mockBatchCallContext(t *testing.T, ec *evmclimocks.Client) { + ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + elems := args.Get(1).([]rpc.BatchElem) + for _, e := range elems { + var num int64 + block := e.Args[0].(string) + switch block { + case "latest": + num = 8 + case "finalized": + num = 5 + default: + n, err := hexutil.DecodeUint64(block) + require.NoError(t, err) + num = int64(n) + } + result := e.Result.(*evmtypes.Head) + *result = evmtypes.Head{Number: num, Hash: utils.NewHash()} + + } + }) } func TestLogPoller_Replay(t *testing.T) { @@ -269,16 +296,20 @@ func TestLogPoller_Replay(t *testing.T) { } ec := evmclimocks.NewClient(t) - ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Twice() + ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(context.Context, *big.Int) (*evmtypes.Head, error) { + headCopy := head + return &headCopy, nil + }) + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once() ec.On("ConfiguredChainID").Return(chainID, nil) + lpOpts := Opts{ - PollPeriod: time.Hour, + PollPeriod: time.Second, FinalityDepth: 3, BackfillBatchSize: 3, RpcBatchSize: 3, KeepFinalizedBlocksDepth: 20, - BackupPollerBlockDelay: 100, + BackupPollerBlockDelay: 0, } lp := NewLogPoller(orm, ec, lggr, lpOpts) @@ -308,6 +339,8 @@ func TestLogPoller_Replay(t *testing.T) { // Replay() should return error code received from replayComplete t.Run("returns error code on replay complete", func(t *testing.T) { + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once() + mockBatchCallContext(t, ec) anyErr := pkgerrors.New("any error") done := make(chan struct{}) go func() { @@ -345,6 +378,7 @@ func TestLogPoller_Replay(t *testing.T) { var wg sync.WaitGroup defer func() { wg.Wait() }() ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { + head = evmtypes.Head{Number: 4} wg.Add(1) go func() { defer wg.Done() @@ -371,6 +405,7 @@ func TestLogPoller_Replay(t *testing.T) { ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms + head = evmtypes.Head{Number: 5} t.Cleanup(lp.reset) servicetest.Run(t, lp) @@ -395,6 +430,8 @@ func TestLogPoller_Replay(t *testing.T) { ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { go func() { defer close(done) + + head = evmtypes.Head{Number: 4} // Restore latest block to 4, so this matches the fromBlock requested select { case lp.replayStart <- 4: case <-ctx.Done(): @@ -405,9 +442,10 @@ func TestLogPoller_Replay(t *testing.T) { lp.cancel() close(pass) }) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil) t.Cleanup(lp.reset) + head = evmtypes.Head{Number: 5} // Latest block must be > lastProcessed in order for SaveAndPollLogs() to call FilterLogs() servicetest.Run(t, lp) select { @@ -420,6 +458,9 @@ func TestLogPoller_Replay(t *testing.T) { // ReplayAsync should return as soon as replayStart is received t.Run("ReplayAsync success", func(t *testing.T) { t.Cleanup(lp.reset) + head = evmtypes.Head{Number: 5} + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil) + mockBatchCallContext(t, ec) servicetest.Run(t, lp) lp.ReplayAsync(1) @@ -430,6 +471,7 @@ func TestLogPoller_Replay(t *testing.T) { t.Run("ReplayAsync error", func(t *testing.T) { t.Cleanup(lp.reset) servicetest.Run(t, lp) + head = evmtypes.Head{Number: 4} anyErr := pkgerrors.New("async error") observedLogs.TakeAll() @@ -462,6 +504,9 @@ func TestLogPoller_Replay(t *testing.T) { err = lp.orm.InsertBlock(ctx, head.Hash, head.Number, head.Timestamp, head.Number) require.NoError(t, err) + ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil) + mockBatchCallContext(t, ec) + err = lp.Replay(ctx, 1) require.NoError(t, err) }) @@ -554,6 +599,77 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { }) } +func Test_FetchBlocks(t *testing.T) { + lggr := logger.Test(t) + chainID := testutils.FixtureChainID + db := pgtest.NewSqlxDB(t) + orm := NewORM(chainID, db, lggr) + ctx := testutils.Context(t) + + lpOpts := Opts{ + PollPeriod: time.Hour, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 50, + FinalityDepth: 3, + } + + ec := evmclimocks.NewClient(t) + mockBatchCallContext(t, ec) // This will return 5 for "finalized" and 8 for "latest" + + cases := []struct { + name string + blocksRequested []string + expectedErr error + }{{ + "successful validation including finalized and latest", + []string{"0x3", "latest", "0x5", "finalized", "0x1"}, + nil, + }, { + "successful validation with all block numbers", + []string{"0x2", "0x5", "0x3", "0x4"}, + nil, + }, { + "finality violation including finalized and latest", + []string{"0x8", "0x2", "latest", "finalized"}, + errors.New("Received unfinalized block 8 while expecting finalized block (latestFinalizedBlockNumber = 5)"), + }, { + "finality violation with all block numbers", + []string{"0x9", "0x2", "finalized", "latest"}, + errors.New("Received unfinalized block 9 while expecting finalized block (latestFinalizedBlockNumber = 5)"), + }} + + lp := NewLogPoller(orm, ec, lggr, lpOpts) + for _, tc := range cases { + for _, lp.useFinalityTag = range []bool{false, true} { + blockValidationReq := latestBlock + if lp.useFinalityTag { + blockValidationReq = finalizedBlock + } + t.Run(fmt.Sprintf("%s where useFinalityTag=%t", tc.name, lp.useFinalityTag), func(t *testing.T) { + blocks, err := lp.fetchBlocks(ctx, tc.blocksRequested, blockValidationReq) + if tc.expectedErr != nil { + require.Equal(t, err.Error(), tc.expectedErr.Error()) + return // PASS + } + require.NoError(t, err) + for i, blockRequested := range tc.blocksRequested { + switch blockRequested { + case string(latestBlock): + assert.Equal(t, int64(8), blocks[i].Number) + case string(finalizedBlock): + assert.Equal(t, int64(5), blocks[i].Number) + default: + blockNum, err2 := hexutil.DecodeUint64(blockRequested) + require.NoError(t, err2) + assert.Equal(t, int64(blockNum), blocks[i].Number) + } + } + }) + } + } +} + func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { lggr := logger.Test(b) lpOpts := Opts{ diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 2096ccf3cf4..74ec41fa85a 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -545,8 +545,6 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) BackupPollerBlockDelay: 1, } th := SetupTH(t, lpOpts) - //header, err := th.Client.HeaderByNumber(ctx, nil) - //require.NoError(t, err) // Emit some logs in blocks for i := 1; i <= logsBatch; i++ { @@ -559,7 +557,7 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) // 1 -> 2 -> ... -> firstBatchBlock firstBatchBlock := th.PollAndSaveLogs(ctx, 1) // Mark current tip of the chain as finalized (after emitting 10 logs) - markBlockAsFinalized(t, th, firstBatchBlock) + markBlockAsFinalized(t, th, firstBatchBlock-1) // Emit 2nd batch of block for i := 1; i <= logsBatch; i++ { @@ -571,7 +569,7 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) // 1 -> 2 -> ... -> firstBatchBlock (finalized) -> .. -> firstBatchBlock + emitted logs secondBatchBlock := th.PollAndSaveLogs(ctx, firstBatchBlock) // Mark current tip of the block as finalized (after emitting 20 logs) - markBlockAsFinalized(t, th, secondBatchBlock) + markBlockAsFinalized(t, th, secondBatchBlock-1) // Register filter err := th.LogPoller.RegisterFilter(ctx, logpoller.Filter{ @@ -595,8 +593,8 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) th.EmitterAddress1, ) require.NoError(t, err) - require.Len(t, logs, logsBatch+1) - require.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000009`), logs[0].Data) + require.Len(t, logs, logsBatch) + require.Equal(t, hexutil.MustDecode(`0x000000000000000000000000000000000000000000000000000000000000000a`), logs[0].Data) } func TestLogPoller_BlockTimestamps(t *testing.T) { @@ -676,6 +674,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) { require.NoError(t, err) // Logs should have correct timestamps + require.NotZero(t, len(lg1)) b, _ := th.Client.BlockByHash(ctx, lg1[0].BlockHash) t.Log(len(lg1), lg1[0].BlockTimestamp) assert.Equal(t, int64(b.Time()), lg1[0].BlockTimestamp.UTC().Unix(), time1) @@ -1181,6 +1180,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { // Check that L1_1 has a proper data payload lgs, err := th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2) require.NoError(t, err) + require.NotZero(t, len(lgs)) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000001`), lgs[0].Data) // Single block reorg and log poller not working for a while, mine blocks and progress with finalization @@ -1208,6 +1208,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { // Expect L1_2 to be properly updated lgs, err = th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2) require.NoError(t, err) + require.NotZero(t, len(lgs)) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000002`), lgs[0].Data) th.assertHaveCanonical(t, 1, 1) th.assertDontHave(t, 2, 3) // These blocks are backfilled @@ -1301,6 +1302,7 @@ func TestLogPoller_LoadFilters(t *testing.T) { func TestLogPoller_GetBlocks_Range(t *testing.T) { t.Parallel() + lpOpts := logpoller.Opts{ UseFinalityTag: false, FinalityDepth: 2, @@ -1310,7 +1312,15 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { } th := SetupTH(t, lpOpts) - err := th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{ + _, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)}) + require.NoError(t, err) + th.Client.Commit() // Commit block #2 with log in it + + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(2)}) + require.NoError(t, err) + th.Client.Commit() // Commit block #3 with a different log + + err = th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{ Name: "GetBlocks Test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}, @@ -1330,16 +1340,13 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 1, len(blocks)) assert.Equal(t, 1, int(blocks[0].BlockNumber)) - // LP fails to retrieve block 2 because it's neither in DB nor returned by RPC + // LP fails to return block 2 because it hasn't been finalized yet blockNums = []uint64{2} _, err = th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums) require.Error(t, err) - assert.Equal(t, "blocks were not found in db or RPC call: [2]", err.Error()) + assert.Equal(t, "Received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error()) - // Emit a log and mine block #2 - _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)}) - require.NoError(t, err) - th.Client.Commit() + th.Client.Commit() // Commit block #4, so that block #2 is finalized // Assert block 2 is not yet in DB _, err = th.ORM.SelectBlockByNumber(testutils.Context(t), 2) @@ -1351,10 +1358,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 1, len(rpcBlocks)) assert.Equal(t, 2, int(rpcBlocks[0].BlockNumber)) - // Emit a log and mine block #3 - _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(2)}) - require.NoError(t, err) - th.Client.Commit() + th.Client.Commit() // commit block #5 so that #3 becomes finalized // Assert block 3 is not yet in DB _, err = th.ORM.SelectBlockByNumber(testutils.Context(t), 3) @@ -1368,12 +1372,9 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 1, int(rpcBlocks2[0].BlockNumber)) assert.Equal(t, 3, int(rpcBlocks2[1].BlockNumber)) - // after calling PollAndSaveLogs, block 2 & 3 are persisted in DB + // after calling PollAndSaveLogs, block 3 (latest finalized block) is persisted in DB th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1) - block, err := th.ORM.SelectBlockByNumber(testutils.Context(t), 2) - require.NoError(t, err) - assert.Equal(t, 2, int(block.BlockNumber)) - block, err = th.ORM.SelectBlockByNumber(testutils.Context(t), 3) + block, err := th.ORM.SelectBlockByNumber(testutils.Context(t), 3) require.NoError(t, err) assert.Equal(t, 3, int(block.BlockNumber)) @@ -1507,9 +1508,9 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { time.Sleep(100 * time.Millisecond) require.NoError(t, lp.Start(ctx)) - require.Eventually(t, func() bool { + testutils.AssertEventually(t, func() bool { return observedLogs.Len() >= 1 - }, 2*time.Second, 20*time.Millisecond) + }) err = lp.Close() require.NoError(t, err)