Skip to content

Commit

Permalink
CCIP-1230 Exposing entire LogPollerBlock from LatestBlock in LogPoller (
Browse files Browse the repository at this point in the history
#11105)

* Exposing entire LogPollerBlock from LatestBlock function in the LogPoller's interface

* Exposing entire LogPollerBlock from LatestBlock function in the LogPoller's interface
  • Loading branch information
mateusz-sekara authored Oct 31, 2023
1 parent 44bf841 commit 2534e1a
Show file tree
Hide file tree
Showing 35 changed files with 123 additions and 104 deletions.
4 changes: 3 additions & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (disabled) UnregisterFilter(name string, qopts ...pg.QOpt) error { return E

func (disabled) HasFilter(name string) bool { return false }

func (disabled) LatestBlock(qopts ...pg.QOpt) (int64, error) { return -1, ErrDisabled }
func (disabled) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) {
return LogPollerBlock{}, ErrDisabled
}

func (disabled) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) {
return nil, ErrDisabled
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize
func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 {
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber)
latest, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(ctx))
return latest + 1
return latest.BlockNumber + 1
}

func (th *TestHarness) assertDontHave(t *testing.T, start, end int) {
Expand Down
8 changes: 4 additions & 4 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type LogPoller interface {
RegisterFilter(filter Filter, qopts ...pg.QOpt) error
UnregisterFilter(name string, qopts ...pg.QOpt) error
HasFilter(name string) bool
LatestBlock(qopts ...pg.QOpt) (int64, error)
LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error)
GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error)

// General querying
Expand Down Expand Up @@ -1019,13 +1019,13 @@ func (lp *logPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.

// LatestBlock returns the latest block the log poller is on. It tracks blocks to be able
// to detect reorgs.
func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) {
func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) {
b, err := lp.orm.SelectLatestBlock(qopts...)
if err != nil {
return 0, err
return LogPollerBlock{}, err
}

return b.BlockNumber, nil
return *b, nil
}

func (lp *logPoller) BlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) {
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestLogPoller_Replay(t *testing.T) {
lp.PollAndSaveLogs(tctx, 4)
latest, err := lp.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(4), latest)
require.Equal(t, int64(4), latest.BlockNumber)

t.Run("abort before replayStart received", func(t *testing.T) {
// Replay() should abort immediately if caller's context is cancelled before request signal is read
Expand Down
39 changes: 25 additions & 14 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func Test_BackupLogPoller(t *testing.T) {
body.Transactions = types.Transactions{} // number of tx's must match # of logs for GetLogs() to succeed
rawdb.WriteBody(th.EthDB, h.Hash(), h.Number.Uint64(), body)

currentBlock := th.PollAndSaveLogs(ctx, 1)
assert.Equal(t, int64(35), currentBlock)
currentBlockNumber := th.PollAndSaveLogs(ctx, 1)
assert.Equal(t, int64(35), currentBlockNumber)

// simulate logs becoming available
rawdb.WriteReceipts(th.EthDB, h.Hash(), h.Number.Uint64(), receipts)
Expand Down Expand Up @@ -342,12 +342,12 @@ func Test_BackupLogPoller(t *testing.T) {
markBlockAsFinalized(t, th, 34)

// Run ordinary poller + backup poller at least once
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1)
currentBlock, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1)
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))

require.Equal(t, int64(37), currentBlock+1)
require.Equal(t, int64(37), currentBlock.BlockNumber+1)

// logs still shouldn't show up, because we don't want to backfill the last finalized log
// to help with reorg detection
Expand All @@ -359,11 +359,11 @@ func Test_BackupLogPoller(t *testing.T) {
markBlockAsFinalized(t, th, 35)

// Run ordinary poller + backup poller at least once more
th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1)
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1)
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))

require.Equal(t, int64(38), currentBlock+1)
require.Equal(t, int64(38), currentBlock.BlockNumber+1)

// all 3 logs in block 34 should show up now, thanks to backup logger
logs, err = th.LogPoller.Logs(30, 37, EmitterABI.Events["Log1"].ID, th.EmitterAddress1,
Expand Down Expand Up @@ -471,6 +471,13 @@ func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) {
// 1 -> 2 -> ...
th.PollAndSaveLogs(ctx, 1)

// Check that latest block has the same properties as the head
latestBlock, err := th.LogPoller.LatestBlock()
require.NoError(t, err)
assert.Equal(t, latestBlock.BlockNumber, header.Number.Int64())
assert.Equal(t, latestBlock.FinalizedBlockNumber, header.Number.Int64())
assert.Equal(t, latestBlock.BlockHash, header.Hash())

// Register filter
err = th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "Test Emitter",
Expand Down Expand Up @@ -619,7 +626,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
require.Len(t, gethLogs, 2)

lb, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.PollAndSaveLogs(context.Background(), lb+1)
th.PollAndSaveLogs(context.Background(), lb.BlockNumber+1)
lg1, err := th.LogPoller.Logs(0, 20, EmitterABI.Events["Log1"].ID, th.EmitterAddress1,
pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
Expand Down Expand Up @@ -667,9 +674,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1.
ec.Commit()
}
currentBlock := int64(1)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock)
currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
currentBlockNumber := int64(1)
lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber)
currentBlock, err := lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
matchesGeth := func() bool {
// Check every block is identical
Expand Down Expand Up @@ -719,7 +726,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
require.NoError(t, err1)
t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash())
}
lp.PollAndSaveLogs(testutils.Context(t), currentBlock)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber)
currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
}
Expand Down Expand Up @@ -1245,7 +1252,7 @@ func TestGetReplayFromBlock(t *testing.T) {
require.NoError(t, err)
latest, err := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
assert.Equal(t, latest, fromBlock)
assert.Equal(t, latest.BlockNumber, fromBlock)

// Should take min(latest, requested) in this case requested.
requested = int64(7)
Expand Down Expand Up @@ -1551,6 +1558,10 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2, 1000)
// Should return error before the first poll and save
_, err := th.LogPoller.LatestBlock()
require.Error(t, err)

// Mark first block as finalized
h := th.Client.Blockchain().CurrentHeader()
th.Client.Blockchain().SetFinalized(h)
Expand All @@ -1562,7 +1573,7 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) {

th.PollAndSaveLogs(ctx, 1)

latestBlock, err := th.ORM.SelectLatestBlock()
latestBlock, err := th.LogPoller.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(numberOfBlocks), latestBlock.BlockNumber)
require.Equal(t, tt.expectedFinalizedBlock, latestBlock.FinalizedBlockNumber)
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions core/services/blockhashstore/coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (v *V1Coordinator) Fulfillments(ctx context.Context, fromBlock uint64) ([]E

logs, err := v.lp.LogsWithSigs(
int64(fromBlock),
int64(toBlock),
toBlock.BlockNumber,
[]common.Hash{
v1.VRFCoordinatorRandomnessRequestFulfilled{}.Topic(),
},
Expand Down Expand Up @@ -219,7 +219,7 @@ func (v *V2Coordinator) Fulfillments(ctx context.Context, fromBlock uint64) ([]E

logs, err := v.lp.LogsWithSigs(
int64(fromBlock),
int64(toBlock),
toBlock.BlockNumber,
[]common.Hash{
v2.VRFCoordinatorV2RandomWordsFulfilled{}.Topic(),
},
Expand Down Expand Up @@ -310,7 +310,7 @@ func (v *V2PlusCoordinator) Fulfillments(ctx context.Context, fromBlock uint64)

logs, err := v.lp.LogsWithSigs(
int64(fromBlock),
int64(toBlock),
toBlock.BlockNumber,
[]common.Hash{
v2plus.IVRFCoordinatorV2PlusInternalRandomWordsFulfilled{}.Topic(),
},
Expand Down
2 changes: 1 addition & 1 deletion core/services/blockhashstore/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
if err != nil {
return 0, errors.Wrap(err, "getting chain head")
}
return uint64(head), nil
return uint64(head.BlockNumber), nil
})

return []job.ServiceCtx{&service{
Expand Down
3 changes: 2 additions & 1 deletion core/services/blockhashstore/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/chains/evm"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
mocklp "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -58,7 +59,7 @@ func createTestDelegate(t *testing.T) (*blockhashstore.Delegate, *testData) {
sendingKey, _ := cltest.MustInsertRandomKey(t, kst)
lp := &mocklp.LogPoller{}
lp.On("RegisterFilter", mock.Anything).Return(nil)
lp.On("LatestBlock", mock.Anything, mock.Anything).Return(int64(0), nil)
lp.On("LatestBlock", mock.Anything, mock.Anything).Return(logpoller.LogPollerBlock{}, nil)

relayExtenders := evmtest.NewChainRelayExtenders(
t,
Expand Down
6 changes: 3 additions & 3 deletions core/services/blockhashstore/feeder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (test testCase) testFeederWithLogPollerVRFv1(t *testing.T) {

// Mock log poller.
lp.On("LatestBlock", mock.Anything).
Return(latest, nil)
Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil)
lp.On(
"LogsWithSigs",
fromBlock,
Expand Down Expand Up @@ -543,7 +543,7 @@ func (test testCase) testFeederWithLogPollerVRFv2(t *testing.T) {

// Mock log poller.
lp.On("LatestBlock", mock.Anything).
Return(latest, nil)
Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil)
lp.On(
"LogsWithSigs",
fromBlock,
Expand Down Expand Up @@ -641,7 +641,7 @@ func (test testCase) testFeederWithLogPollerVRFv2Plus(t *testing.T) {

// Mock log poller.
lp.On("LatestBlock", mock.Anything).
Return(latest, nil)
Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil)
lp.On(
"LogsWithSigs",
fromBlock,
Expand Down
24 changes: 12 additions & 12 deletions core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (c *LogProvider) PerformLogs(ctx context.Context) ([]ocr2keepers.PerformLog
// always check the last lookback number of blocks and rebroadcast
// this allows the plugin to make decisions based on event confirmations
logs, err := c.logPoller.LogsWithSigs(
end-c.lookbackBlocks,
end,
end.BlockNumber-c.lookbackBlocks,
end.BlockNumber,
[]common.Hash{
registry.KeeperRegistryUpkeepPerformed{}.Topic(),
},
Expand All @@ -175,7 +175,7 @@ func (c *LogProvider) PerformLogs(ctx context.Context) ([]ocr2keepers.PerformLog
Key: UpkeepKeyHelper[uint32]{}.MakeUpkeepKey(p.CheckBlockNumber, p.Id),
TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(p.BlockNumber),
TransactionHash: p.TxHash.Hex(),
Confirmations: end - p.BlockNumber,
Confirmations: end.BlockNumber - p.BlockNumber,
}
vals = append(vals, l)
}
Expand All @@ -194,8 +194,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR

// ReorgedUpkeepReportLogs
logs, err := c.logPoller.LogsWithSigs(
end-c.lookbackBlocks,
end,
end.BlockNumber-c.lookbackBlocks,
end.BlockNumber,
[]common.Hash{
registry.KeeperRegistryReorgedUpkeepReport{}.Topic(),
},
Expand All @@ -212,8 +212,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR

// StaleUpkeepReportLogs
logs, err = c.logPoller.LogsWithSigs(
end-c.lookbackBlocks,
end,
end.BlockNumber-c.lookbackBlocks,
end.BlockNumber,
[]common.Hash{
registry.KeeperRegistryStaleUpkeepReport{}.Topic(),
},
Expand All @@ -230,8 +230,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR

// InsufficientFundsUpkeepReportLogs
logs, err = c.logPoller.LogsWithSigs(
end-c.lookbackBlocks,
end,
end.BlockNumber-c.lookbackBlocks,
end.BlockNumber,
[]common.Hash{
registry.KeeperRegistryInsufficientFundsUpkeepReport{}.Topic(),
},
Expand All @@ -258,7 +258,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR
Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId),
TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber),
TransactionHash: r.TxHash.Hex(),
Confirmations: end - r.BlockNumber,
Confirmations: end.BlockNumber - r.BlockNumber,
}
vals = append(vals, l)
}
Expand All @@ -273,7 +273,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR
Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId),
TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber),
TransactionHash: r.TxHash.Hex(),
Confirmations: end - r.BlockNumber,
Confirmations: end.BlockNumber - r.BlockNumber,
}
vals = append(vals, l)
}
Expand All @@ -288,7 +288,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR
Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId),
TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber),
TransactionHash: r.TxHash.Hex(),
Confirmations: end - r.BlockNumber,
Confirmations: end.BlockNumber - r.BlockNumber,
}
vals = append(vals, l)
}
Expand Down
10 changes: 5 additions & 5 deletions core/services/ocr2/plugins/ocr2keeper/evm20/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (r *EvmRegistry) initialize() error {

func (r *EvmRegistry) pollLogs() error {
var latest int64
var end int64
var end logpoller.LogPollerBlock
var err error

if end, err = r.poller.LatestBlock(pg.WithParentCtx(r.ctx)); err != nil {
Expand All @@ -356,20 +356,20 @@ func (r *EvmRegistry) pollLogs() error {

r.mu.Lock()
latest = r.lastPollBlock
r.lastPollBlock = end
r.lastPollBlock = end.BlockNumber
r.mu.Unlock()

// if start and end are the same, no polling needs to be done
if latest == 0 || latest == end {
if latest == 0 || latest == end.BlockNumber {
return nil
}

{
var logs []logpoller.Log

if logs, err = r.poller.LogsWithSigs(
end-logEventLookback,
end,
end.BlockNumber-logEventLookback,
end.BlockNumber,
upkeepStateEvents,
r.addr,
pg.WithParentCtx(r.ctx),
Expand Down
Loading

0 comments on commit 2534e1a

Please sign in to comment.