From ad22c6ed0c4bed4053adc1585dca0ad55b481685 Mon Sep 17 00:00:00 2001 From: Amir Y <83904651+amirylm@users.noreply.github.com> Date: Wed, 6 Sep 2023 21:17:23 +0300 Subject: [PATCH] Seed order to manage logs overflow (#10485) * update ocr2keepers * go mod tidy * buffer: drop logs by seed-order * comment out noisy log * ensure order for provider.GetLatestPayloads() * ensure order for recoverer.GetRecoveryProposals() * use a normalized value of latestBlock * set overall limit for recovery proposals (MaxProposals) value TBD, currently set to 50 * set max proposals to 20 (was 50) * apply total limits when dequeing for payloads MaxPayloads was set to 100 * fix test * fix max block logs * protect log spamming * renaming * lint * set offset to 100 * added tests * use maps when sorting * temporary added blockhash to log id * lint * remove todo from log id func --- .../ocr2keeper/evm21/logprovider/buffer.go | 132 ++++- .../evm21/logprovider/buffer_test.go | 510 +++++++++++++++++- .../evm21/logprovider/integration_test.go | 6 +- .../ocr2keeper/evm21/logprovider/provider.go | 11 +- .../ocr2keeper/evm21/logprovider/recoverer.go | 55 +- .../evm21/logprovider/recoverer_test.go | 4 +- .../ocr2/plugins/ocr2keeper/evm21/registry.go | 1 + 7 files changed, 677 insertions(+), 42 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index ad0ae5e1024..1835ac69f09 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -1,26 +1,45 @@ package logprovider import ( + "encoding/hex" "math/big" "sort" "sync" "sync/atomic" + "github.com/smartcontractkit/ocr2keepers/pkg/v3/random" + ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/logger" ) var ( - // allowedLogsPerBlock is the maximum number of logs allowed per upkeep in a block. - allowedLogsPerBlock = 128 - // bufferMaxBlockSize is the maximum number of blocks in the buffer. - bufferMaxBlockSize = 1024 + // maxLogsPerUpkeepInBlock is the maximum number of logs allowed per upkeep in a block. + maxLogsPerUpkeepInBlock = 32 + // maxLogsPerBlock is the maximum number of blocks in the buffer. + maxLogsPerBlock = 1024 ) // fetchedLog holds the log and the ID of the upkeep type fetchedLog struct { upkeepID *big.Int log logpoller.Log + // cachedLogID is the cached log identifier, used for sorting. + // It is calculated lazily, and cached for performance. + cachedLogID string +} + +func (l *fetchedLog) getLogID() string { + if len(l.cachedLogID) == 0 { + ext := ocr2keepers.LogTriggerExtension{ + Index: uint32(l.log.LogIndex), + } + copy(ext.TxHash[:], l.log.TxHash[:]) + copy(ext.BlockHash[:], l.log.BlockHash[:]) + l.cachedLogID = hex.EncodeToString(ext.LogIdentifier()) + } + return l.cachedLogID } // fetchedBlock holds the logs fetched for a block @@ -33,9 +52,46 @@ type fetchedBlock struct { visited []fetchedLog } +func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) { + has, upkeepLogs := b.has(fl.upkeepID, fl.log) + if has { + // Skipping known logs + return fetchedLog{}, false + } + // lggr.Debugw("Adding log", "i", i, "blockBlock", currentBlock.blockNumber, "logBlock", log.BlockNumber, "id", id) + b.logs = append(b.logs, fl) + + // drop logs if we reached limits. + if upkeepLogs+1 > maxUpkeepLogs { + // in case we have logs overflow for a particular upkeep, we drop a log of that upkeep, + // based on shared, random (per block) order of the logs in the block. + b.Sort() + var dropped fetchedLog + currentLogs := make([]fetchedLog, 0, len(b.logs)-1) + for _, l := range b.logs { + if dropped.upkeepID == nil && l.upkeepID.Cmp(fl.upkeepID) == 0 { + dropped = l + continue + } + currentLogs = append(currentLogs, l) + } + b.logs = currentLogs + return dropped, true + } else if len(b.logs)+len(b.visited) > maxBlockLogs { + // in case we have logs overflow in the buffer level, we drop a log based on + // shared, random (per block) order of the logs in the block. + b.Sort() + dropped := b.logs[0] + b.logs = b.logs[1:] + return dropped, true + } + + return fetchedLog{}, true +} + // Has returns true if the block has the log, // and the number of logs for that upkeep in the block. -func (b fetchedBlock) Has(id *big.Int, log logpoller.Log) (bool, int) { +func (b fetchedBlock) has(id *big.Int, log logpoller.Log) (bool, int) { allLogs := append(b.logs, b.visited...) upkeepLogs := 0 for _, l := range allLogs { @@ -62,6 +118,22 @@ func (b fetchedBlock) Clone() fetchedBlock { } } +// Sort by log identifiers, shuffled using a pseduorandom souce that is shared across all nodes +// for a given block. +func (b *fetchedBlock) Sort() { + randSeed := random.GetRandomKeySource(nil, uint64(b.blockNumber)) + + shuffledLogIDs := make(map[string]string, len(b.logs)) + for _, log := range b.logs { + logID := log.getLogID() + shuffledLogIDs[logID] = random.ShuffleString(logID, randSeed) + } + + sort.SliceStable(b.logs, func(i, j int) bool { + return shuffledLogIDs[b.logs[i].getLogID()] < shuffledLogIDs[b.logs[j].getLogID()] + }) +} + // logEventBuffer is a circular/ring buffer of fetched logs. // Each entry in the buffer represents a block, // and holds the logs fetched for that block. @@ -97,6 +169,7 @@ func (b *logEventBuffer) bufferSize() int { } // enqueue adds logs (if not exist) to the buffer, returning the number of logs added +// minus the number of logs dropped. func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { b.lock.Lock() defer b.lock.Unlock() @@ -107,7 +180,8 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { maxUpkeepLogs := int(b.maxUpkeepLogsPerBlock) latestBlock := b.latestBlockSeen() - added := 0 + added, dropped := 0, 0 + for _, log := range logs { if log.BlockNumber == 0 { // invalid log @@ -125,23 +199,20 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr.Debugw("Skipping log from old block", "currentBlock", currentBlock.blockNumber, "newBlock", log.BlockNumber) continue } - if len(currentBlock.logs)+1 > maxBlockLogs { - lggr.Debugw("Reached max logs number per block, dropping log", "blockNumber", log.BlockNumber, - "blockHash", log.BlockHash, "txHash", log.TxHash, "logIndex", log.LogIndex) + droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxBlockLogs, maxUpkeepLogs) + if !ok { + // Skipping known logs continue } - if has, upkeepLogs := currentBlock.Has(id, log); has { - // Skipping existing log - continue - } else if upkeepLogs+1 > maxUpkeepLogs { - lggr.Debugw("Reached max logs number per upkeep, dropping log", "blockNumber", log.BlockNumber, - "blockHash", log.BlockHash, "txHash", log.TxHash, "logIndex", log.LogIndex) - continue + if droppedLog.upkeepID != nil { + dropped++ + lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", droppedLog.log.BlockNumber, + "blockHash", droppedLog.log.BlockHash, "txHash", droppedLog.log.TxHash, "logIndex", droppedLog.log.LogIndex, + "upkeepID", droppedLog.upkeepID.String()) } - // lggr.Debugw("Adding log", "i", i, "blockBlock", currentBlock.blockNumber, "logBlock", log.BlockNumber, "id", id) - currentBlock.logs = append(currentBlock.logs, fetchedLog{upkeepID: id, log: log}) - b.blocks[i] = currentBlock added++ + b.blocks[i] = currentBlock + if log.BlockNumber > latestBlock { latestBlock = log.BlockNumber } @@ -151,10 +222,10 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { atomic.StoreInt64(&b.latestBlock, latestBlock) } if added > 0 { - lggr.Debugw("Added logs to buffer", "addedLogs", added, "latestBlock", latestBlock) + lggr.Debugw("Added logs to buffer", "addedLogs", added, "dropped", dropped, "latestBlock", latestBlock) } - return added + return added - dropped } // peek returns the logs in range [latestBlock-blocks, latestBlock] @@ -196,7 +267,7 @@ func (b *logEventBuffer) peekRange(start, end int64) []fetchedLog { } // dequeueRange returns the logs between start and end inclusive. -func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit int) []fetchedLog { +func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit, totalLimit int) []fetchedLog { b.lock.Lock() defer b.lock.Unlock() @@ -214,20 +285,33 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit int) []fetch }) logsCount := map[string]int{} + totalCount := 0 var results []fetchedLog for _, block := range fetchedBlocks { - // double checking that we don't have any gaps in the range if block.blockNumber < start || block.blockNumber > end { + // double checking that we don't have any gaps in the range continue } + if totalCount >= totalLimit { + // reached total limit, no need to process more blocks + break + } + // Sort the logs in random order that is shared across all nodes. + // This ensures that nodes across the network will process the same logs. + block.Sort() var remainingLogs, blockResults []fetchedLog for _, log := range block.logs { + if totalCount >= totalLimit { + remainingLogs = append(remainingLogs, log) + continue + } if logsCount[log.upkeepID.String()] >= upkeepLimit { remainingLogs = append(remainingLogs, log) continue } - logsCount[log.upkeepID.String()]++ blockResults = append(blockResults, log) + logsCount[log.upkeepID.String()]++ + totalCount++ } if len(blockResults) == 0 { continue diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go index 18eecb748a5..0f389d0d418 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go @@ -1,15 +1,18 @@ package logprovider import ( + "encoding/hex" "fmt" "math/big" "testing" "github.com/ethereum/go-ethereum/common" + ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/core" ) func TestLogEventBuffer_GetBlocksInRange(t *testing.T) { @@ -236,7 +239,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { results := buf.peekRange(int64(1), int64(2)) require.Equal(t, 2, len(results)) verifyBlockNumbers(t, results, 1, 2) - removed := buf.dequeueRange(int64(1), int64(2), 2) + removed := buf.dequeueRange(int64(1), int64(2), 2, 10) require.Equal(t, 2, len(removed)) results = buf.peekRange(int64(1), int64(2)) require.Equal(t, 0, len(results)) @@ -256,7 +259,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { results := buf.peek(8) require.Equal(t, 4, len(results)) verifyBlockNumbers(t, results, 1, 2, 3, 3) - removed := buf.dequeueRange(1, 3, 5) + removed := buf.dequeueRange(1, 3, 5, 5) require.Equal(t, 4, len(removed)) buf.lock.Lock() require.Equal(t, 0, len(buf.blocks[0].logs)) @@ -313,10 +316,18 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { logpoller.Log{BlockNumber: 5, TxHash: common.HexToHash("0x5"), LogIndex: 0}, ), 5) - logs := buf.dequeueRange(1, 5, 2) + logs := buf.dequeueRange(1, 5, 2, 10) require.Equal(t, 2, len(logs)) require.Equal(t, int64(5), logs[0].log.BlockNumber) require.Equal(t, int64(4), logs[1].log.BlockNumber) + + require.Equal(t, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x4"), LogIndex: 1}, + logpoller.Log{BlockNumber: 5, TxHash: common.HexToHash("0x5"), LogIndex: 1}, + ), 2) + + logs = buf.dequeueRange(1, 5, 3, 2) + require.Equal(t, 2, len(logs)) }) t.Run("dequeue doesn't return same logs again", func(t *testing.T) { @@ -327,19 +338,508 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3"), LogIndex: 0}, ), 3) - logs := buf.dequeueRange(3, 3, 2) + logs := buf.dequeueRange(3, 3, 2, 10) fmt.Println(logs) require.Equal(t, 1, len(logs)) - logs = buf.dequeueRange(3, 3, 2) + logs = buf.dequeueRange(3, 3, 2, 10) fmt.Println(logs) require.Equal(t, 0, len(logs)) }) } +func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { + type appendArgs struct { + fl fetchedLog + maxBlockLogs, maxUpkeepLogs int + added, dropped bool + } + + tests := []struct { + name string + blockNumber int64 + logs []fetchedLog + visited []fetchedLog + toAdd []appendArgs + expected []fetchedLog + added bool + }{ + { + name: "empty block", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + { + name: "existing log", + blockNumber: 1, + logs: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: false, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + { + name: "visited log", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: false, + }, + }, + expected: []fetchedLog{}, + }, + { + name: "upkeep log limits", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + dropped: true, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + { + name: "block log limits", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, + dropped: true, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + lggr := logger.TestLogger(t) + b := fetchedBlock{ + blockNumber: tc.blockNumber, + logs: make([]fetchedLog, len(tc.logs)), + visited: make([]fetchedLog, len(tc.visited)), + } + copy(b.logs, tc.logs) + copy(b.visited, tc.visited) + + for _, args := range tc.toAdd { + dropped, added := b.Append(lggr, args.fl, args.maxBlockLogs, args.maxUpkeepLogs) + require.Equal(t, args.added, added) + if args.dropped { + require.NotNil(t, dropped.upkeepID) + } else { + require.Nil(t, dropped.upkeepID) + } + } + // clear cached logIDs + for i := range b.logs { + b.logs[i].cachedLogID = "" + } + require.Equal(t, tc.expected, b.logs) + }) + } +} +func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { + tests := []struct { + name string + blockNumber int64 + logs []fetchedLog + beforeSort []string + afterSort []string + iterations int + }{ + { + name: "no logs", + blockNumber: 10, + logs: []fetchedLog{}, + beforeSort: []string{}, + afterSort: []string{}, + }, + { + name: "single log", + blockNumber: 1, + logs: []fetchedLog{ + { + log: logpoller.Log{ + BlockHash: common.HexToHash("0x111"), + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + }, + }, + beforeSort: []string{ + "000000000000000000000000000000000000000000000000000000000000000100000000", + }, + afterSort: []string{ + "000000000000000000000000000000000000000000000000000000000000000100000000", + }, + }, + { + name: "multiple logs with 10 iterations", + blockNumber: 1, + logs: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xb711bd1103927611ee41152aa8ae27f3330"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "222").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 4, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 3, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "222").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 5, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 3, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + beforeSort: []string{ + "00000000000000000000000000000b711bd1103927611ee41152aa8ae27f333000000000", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000000", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000004", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000003", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000002", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000005", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000003", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000001", + }, + afterSort: []string{ + "00000000000000000000000000000b711bd1103927611ee41152aa8ae27f333000000000", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000000", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000001", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000002", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000003", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000003", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000004", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000005", + }, + iterations: 10, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + b := fetchedBlock{ + blockNumber: tc.blockNumber, + logs: make([]fetchedLog, len(tc.logs)), + } + if tc.iterations == 0 { + tc.iterations = 1 + } + // performing the same multiple times should yield the same result + // default is one iteration + for i := 0; i < tc.iterations; i++ { + copy(b.logs, tc.logs) + logIDs := getLogIds(b) + require.Equal(t, len(tc.beforeSort), len(logIDs)) + require.Equal(t, tc.beforeSort, logIDs) + b.Sort() + logIDsAfterSort := getLogIds(b) + require.Equal(t, len(tc.afterSort), len(logIDsAfterSort)) + require.Equal(t, tc.afterSort, logIDsAfterSort) + } + }) + } +} + +func TestLogEventBuffer_FetchedBlock_Clone(t *testing.T) { + b1 := fetchedBlock{ + blockNumber: 1, + logs: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + } + + b2 := b1.Clone() + require.Equal(t, b1.blockNumber, b2.blockNumber) + require.Equal(t, len(b1.logs), len(b2.logs)) + require.Equal(t, b1.logs[0].log.BlockNumber, b2.logs[0].log.BlockNumber) + + b1.blockNumber = 2 + b1.logs[0].log.BlockNumber = 2 + require.NotEqual(t, b1.blockNumber, b2.blockNumber) + require.NotEqual(t, b1.logs[0].log.BlockNumber, b2.logs[0].log.BlockNumber) +} + func verifyBlockNumbers(t *testing.T, logs []fetchedLog, bns ...int64) { require.Equal(t, len(bns), len(logs), "expected length mismatch") for i, log := range logs { require.Equal(t, bns[i], log.log.BlockNumber, "wrong block number") } } + +func getLogIds(b fetchedBlock) []string { + logIDs := make([]string, len(b.logs)) + for i, l := range b.logs { + ext := ocr2keepers.LogTriggerExtension{ + TxHash: l.log.TxHash, + Index: uint32(l.log.LogIndex), + BlockHash: l.log.BlockHash, + } + logIDs[i] = hex.EncodeToString(ext.LogIdentifier()) + } + return logIDs +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go index 30994543eb6..b5f229f6015 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go @@ -478,10 +478,10 @@ func TestIntegration_LogRecoverer_Backfill(t *testing.T) { } lp, ethClient, utilsABI := setupDependencies(t, db, backend) filterStore := logprovider.NewUpkeepFilterStore() - origDefaultRecoveryInterval := logprovider.DefaultRecoveryInterval - logprovider.DefaultRecoveryInterval = time.Millisecond * 200 + origDefaultRecoveryInterval := logprovider.RecoveryInterval + logprovider.RecoveryInterval = time.Millisecond * 200 defer func() { - logprovider.DefaultRecoveryInterval = origDefaultRecoveryInterval + logprovider.RecoveryInterval = origDefaultRecoveryInterval }() provider, recoverer := setup(logger.TestLogger(t), lp, nil, utilsABI, &mockUpkeepStateStore{}, filterStore, opts) logProvider := provider.(logprovider.LogEventProviderTest) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index 8fbbb1e0a9d..6b89dfd0e72 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -30,6 +30,8 @@ var ( // AllowedLogsPerUpkeep is the maximum number of logs allowed per upkeep every single call. AllowedLogsPerUpkeep = 5 + // MaxPayloads is the maximum number of payloads to return per call. + MaxPayloads = 100 readJobQueueSize = 64 readLogsTimeout = 10 * time.Second @@ -99,7 +101,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa return &logEventProvider{ packer: packer, lggr: lggr.Named("KeepersRegistry.LogEventProvider"), - buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), bufferMaxBlockSize, allowedLogsPerBlock), + buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock), poller: poller, opts: opts, filterStore: filterStore, @@ -177,7 +179,7 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers if start <= 0 { start = 1 } - logs := p.buffer.dequeueRange(start, latest, AllowedLogsPerUpkeep) + logs := p.buffer.dequeueRange(start, latest, AllowedLogsPerUpkeep, MaxPayloads) // p.lggr.Debugw("got latest logs from buffer", "latest", latest, "diff", diff, "logs", len(logs)) @@ -318,7 +320,10 @@ func (p *logEventProvider) updateFiltersLastPoll(entries []upkeepFilter) { p.filterStore.UpdateFilters(func(orig, f upkeepFilter) upkeepFilter { if f.lastPollBlock > orig.lastPollBlock { orig.lastPollBlock = f.lastPollBlock - p.lggr.Debugw("Updated lastPollBlock", "lastPollBlock", f.lastPollBlock, "upkeepID", f.upkeepID) + if f.lastPollBlock%10 == 0 { + // print log occasionally to avoid spamming logs + p.lggr.Debugw("Updated lastPollBlock", "lastPollBlock", f.lastPollBlock, "upkeepID", f.upkeepID) + } } return orig }, entries...) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index 7a7dbbe46be..dbed9c591cd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -13,6 +13,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/ocr2keepers/pkg/v3/random" ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -24,11 +25,17 @@ import ( ) var ( - DefaultRecoveryInterval = 5 * time.Second - RecoveryCacheTTL = 10*time.Minute - time.Second - GCInterval = RecoveryCacheTTL - - recoveryBatchSize = 10 + // RecoveryInterval is the interval at which the recovery scanning processing is triggered + RecoveryInterval = 5 * time.Second + // RecoveryCacheTTL is the time to live for the recovery cache + RecoveryCacheTTL = 10 * time.Minute + // GCInterval is the interval at which the recovery cache is cleaned up + GCInterval = RecoveryCacheTTL - time.Second + // MaxProposals is the maximum number of proposals that can be returned by GetRecoveryProposals + MaxProposals = 20 + // recoveryBatchSize is the number of filters to recover in a single batch + recoveryBatchSize = 10 + // recoveryLogsBuffer is the number of blocks to be used as a safety buffer when reading logs recoveryLogsBuffer = int64(200) recoveryLogsBurst = int64(500) ) @@ -244,6 +251,11 @@ func (r *logRecoverer) getLogTriggerCheckData(ctx context.Context, proposal ocr2 } func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) { + latestBlock, err := r.poller.LatestBlock(pg.WithParentCtx(ctx)) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) + } + r.lock.Lock() defer r.lock.Unlock() @@ -251,18 +263,29 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. return nil, nil } + allLogsCounter := 0 logsCount := map[string]int{} + r.sortPending(uint64(latestBlock)) + var results, pending []ocr2keepers.UpkeepPayload for _, payload := range r.pending { + if allLogsCounter >= MaxProposals { + // we have enough proposals, pushed the rest are pushed back to pending + pending = append(pending, payload) + continue + } uid := payload.UpkeepID.String() if logsCount[uid] >= AllowedLogsPerUpkeep { + // we have enough proposals for this upkeep, the rest are pushed back to pending pending = append(pending, payload) continue } - logsCount[uid]++ results = append(results, payload) + logsCount[uid]++ + allLogsCounter++ } + r.pending = pending r.lggr.Debugf("found %d pending payloads", len(pending)) @@ -603,3 +626,23 @@ func (r *logRecoverer) removePending(workID string) { } r.pending = updated } + +// sortPending sorts the pending list by a random order based on the normalized latest block number. +// Divided by 10 to ensure that nodes with similar block numbers won't end up with different order. +// NOTE: the lock must be held before calling this function. +func (r *logRecoverer) sortPending(latestBlock uint64) { + normalized := latestBlock / 100 + if normalized == 0 { + normalized = 1 + } + randSeed := random.GetRandomKeySource(nil, normalized) + + shuffledIDs := make(map[string]string, len(r.pending)) + for _, p := range r.pending { + shuffledIDs[p.WorkID] = random.ShuffleString(p.WorkID, randSeed) + } + + sort.SliceStable(r.pending, func(i, j int) bool { + return shuffledIDs[r.pending[i].WorkID] < shuffledIDs[r.pending[j].WorkID] + }) +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go index 0a993831b7b..e7729924304 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go @@ -31,7 +31,9 @@ import ( func TestLogRecoverer_GetRecoverables(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200)) + lp := &lpmocks.LogPoller{} + lp.On("LatestBlock", mock.Anything).Return(int64(100), nil) + r := NewLogRecoverer(logger.TestLogger(t), lp, nil, nil, nil, nil, NewOptions(200)) tests := []struct { name string diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index 849463e53a2..1c54bf553d9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -283,6 +283,7 @@ func (r *EvmRegistry) refreshActiveUpkeeps() error { switch core.GetUpkeepType(*uid) { case ocr2keepers.LogTrigger: logTriggerIDs = append(logTriggerIDs, id) + default: } }