From 5ef07c836b9deb38ccfdca04cbf7fd3d79f18aef Mon Sep 17 00:00:00 2001 From: nogo <110664798+0xnogo@users.noreply.github.com> Date: Thu, 5 Dec 2024 15:45:48 +0400 Subject: [PATCH] Benchmark ccip_reader queries (#15363) * benchmarkCommitReports * adding more bench tests + lint * fix test * addressing comments * change with merge * refactor * addressing comments * add ContractNameToBind --- .../contracts/ccipreader_test.go | 695 ++++++++++++++---- 1 file changed, 545 insertions(+), 150 deletions(-) diff --git a/integration-tests/contracts/ccipreader_test.go b/integration-tests/contracts/ccipreader_test.go index cec9564a30d..3028f4707a4 100644 --- a/integration-tests/contracts/ccipreader_test.go +++ b/integration-tests/contracts/ccipreader_test.go @@ -2,6 +2,7 @@ package contracts import ( "context" + "fmt" "math/big" "sort" "testing" @@ -12,10 +13,12 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient/simulated" + "github.com/jmoiron/sqlx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" - "golang.org/x/exp/maps" + + "github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight" "github.com/smartcontractkit/chainlink-ccip/plugintypes" @@ -24,9 +27,11 @@ import ( "github.com/smartcontractkit/chainlink/integration-tests/utils/pgtest" readermocks "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/contractreader" + + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" - "github.com/smartcontractkit/chainlink-common/pkg/codec" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" @@ -45,6 +50,10 @@ import ( "github.com/smartcontractkit/chainlink-ccip/pkg/consts" "github.com/smartcontractkit/chainlink-ccip/pkg/contractreader" ccipreaderpkg "github.com/smartcontractkit/chainlink-ccip/pkg/reader" + + evmchaintypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/offramp" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/onramp" ) const ( @@ -58,31 +67,19 @@ var ( defaultGasPrice = assets.GWei(10) ) -func setupGetCommitGTETimestampTest(ctx context.Context, t *testing.T, finalityDepth int64) (*testSetupData, int64, common.Address) { - cfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - consts.ContractNameOffRamp: { - ContractPollingFilter: evmtypes.ContractPollingFilter{ - GenericEventNames: []string{consts.EventNameCommitReportAccepted}, - }, - ContractABI: ccip_reader_tester.CCIPReaderTesterABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - consts.EventNameCommitReportAccepted: { - ChainSpecificName: consts.EventNameCommitReportAccepted, - ReadType: evmtypes.Event, - }, - }, - }, - }, - } +var ( + onrampABI = evmchaintypes.MustGetABI(onramp.OnRampABI) + offrampABI = evmchaintypes.MustGetABI(offramp.OffRampABI) +) +func setupGetCommitGTETimestampTest(ctx context.Context, t testing.TB, finalityDepth int64, useHeavyDB bool) (*testSetupData, int64, common.Address) { sb, auth := setupSimulatedBackendAndAuth(t) onRampAddress := utils.RandomAddress() s := testSetup(ctx, t, testSetupParams{ ReaderChain: chainD, DestChain: chainD, OnChainSeqNums: nil, - Cfg: cfg, + Cfg: evmconfig.DestReaderConfig, ToMockBindings: map[cciptypes.ChainSelector][]types.BoundContract{ chainS1: { { @@ -91,15 +88,52 @@ func setupGetCommitGTETimestampTest(ctx context.Context, t *testing.T, finalityD }, }, }, - BindTester: true, - SimulatedBackend: sb, - Auth: auth, - FinalityDepth: finalityDepth, + BindTester: true, + ContractNameToBind: consts.ContractNameOffRamp, + SimulatedBackend: sb, + Auth: auth, + FinalityDepth: finalityDepth, + UseHeavyDB: useHeavyDB, }) return s, finalityDepth, onRampAddress } +func setupExecutedMessageRangesTest(ctx context.Context, t testing.TB, useHeavyDB bool) *testSetupData { + sb, auth := setupSimulatedBackendAndAuth(t) + return testSetup(ctx, t, testSetupParams{ + ReaderChain: chainD, + DestChain: chainD, + OnChainSeqNums: nil, + Cfg: evmconfig.DestReaderConfig, + // Cfg: cfg, + ToBindContracts: nil, + ToMockBindings: nil, + BindTester: true, + ContractNameToBind: consts.ContractNameOffRamp, + SimulatedBackend: sb, + Auth: auth, + UseHeavyDB: useHeavyDB, + }) +} + +func setupMsgsBetweenSeqNumsTest(ctx context.Context, t testing.TB, useHeavyDB bool) *testSetupData { + sb, auth := setupSimulatedBackendAndAuth(t) + return testSetup(ctx, t, testSetupParams{ + ReaderChain: chainS1, + DestChain: chainD, + OnChainSeqNums: nil, + Cfg: evmconfig.SourceReaderConfig, + ToBindContracts: nil, + ToMockBindings: nil, + BindTester: true, + ContractNameToBind: consts.ContractNameOnRamp, + SimulatedBackend: sb, + Auth: auth, + UseHeavyDB: useHeavyDB, + }) +} + func emitCommitReports(ctx context.Context, t *testing.T, s *testSetupData, numReports int, tokenA common.Address, onRampAddress common.Address) uint64 { var firstReportTs uint64 for i := uint8(0); int(i) < numReports; i++ { @@ -138,7 +172,7 @@ func emitCommitReports(ctx context.Context, t *testing.T, s *testSetupData, numR }, }, }) - assert.NoError(t, err) + require.NoError(t, err) bh := s.sb.Commit() b, err := s.sb.Client().BlockByHash(ctx, bh) require.NoError(t, err) @@ -152,7 +186,7 @@ func emitCommitReports(ctx context.Context, t *testing.T, s *testSetupData, numR func TestCCIPReader_CommitReportsGTETimestamp(t *testing.T) { t.Parallel() ctx := tests.Context(t) - s, _, onRampAddress := setupGetCommitGTETimestampTest(ctx, t, 0) + s, _, onRampAddress := setupGetCommitGTETimestampTest(ctx, t, 0, false) tokenA := common.HexToAddress("123") const numReports = 5 @@ -196,7 +230,7 @@ func TestCCIPReader_CommitReportsGTETimestamp_RespectsFinality(t *testing.T) { t.Parallel() ctx := tests.Context(t) var finalityDepth int64 = 10 - s, _, onRampAddress := setupGetCommitGTETimestampTest(ctx, t, finalityDepth) + s, _, onRampAddress := setupGetCommitGTETimestampTest(ctx, t, finalityDepth, false) tokenA := common.HexToAddress("123") const numReports = 5 @@ -258,46 +292,7 @@ func TestCCIPReader_CommitReportsGTETimestamp_RespectsFinality(t *testing.T) { func TestCCIPReader_ExecutedMessageRanges(t *testing.T) { t.Parallel() ctx := tests.Context(t) - cfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - consts.ContractNameOffRamp: { - ContractPollingFilter: evmtypes.ContractPollingFilter{ - GenericEventNames: []string{consts.EventNameExecutionStateChanged}, - }, - ContractABI: ccip_reader_tester.CCIPReaderTesterABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - consts.EventNameExecutionStateChanged: { - ChainSpecificName: consts.EventNameExecutionStateChanged, - ReadType: evmtypes.Event, - EventDefinitions: &evmtypes.EventDefinitions{ - GenericTopicNames: map[string]string{ - "sourceChainSelector": consts.EventAttributeSourceChain, - "sequenceNumber": consts.EventAttributeSequenceNumber, - }, - GenericDataWordDetails: map[string]evmtypes.DataWordDetail{ - consts.EventAttributeState: { - Name: "state", - }, - }, - }, - }, - }, - }, - }, - } - - sb, auth := setupSimulatedBackendAndAuth(t) - s := testSetup(ctx, t, testSetupParams{ - ReaderChain: chainD, - DestChain: chainD, - OnChainSeqNums: nil, - Cfg: cfg, - ToBindContracts: nil, - ToMockBindings: nil, - BindTester: true, - SimulatedBackend: sb, - Auth: auth, - }) + s := setupExecutedMessageRangesTest(ctx, t, false) _, err := s.contract.EmitExecutionStateChanged( s.auth, uint64(chainS1), @@ -308,7 +303,7 @@ func TestCCIPReader_ExecutedMessageRanges(t *testing.T) { []byte{1, 2, 3, 4}, big.NewInt(250_000), ) - assert.NoError(t, err) + require.NoError(t, err) s.sb.Commit() _, err = s.contract.EmitExecutionStateChanged( @@ -321,7 +316,7 @@ func TestCCIPReader_ExecutedMessageRanges(t *testing.T) { []byte{1, 2, 3, 4, 5}, big.NewInt(350_000), ) - assert.NoError(t, err) + require.NoError(t, err) s.sb.Commit() // Need to replay as sometimes the logs are not picked up by the log poller (?) @@ -351,50 +346,7 @@ func TestCCIPReader_MsgsBetweenSeqNums(t *testing.T) { t.Parallel() ctx := tests.Context(t) - cfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - consts.ContractNameOnRamp: { - ContractPollingFilter: evmtypes.ContractPollingFilter{ - GenericEventNames: []string{consts.EventNameCCIPMessageSent}, - }, - ContractABI: ccip_reader_tester.CCIPReaderTesterABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - consts.EventNameCCIPMessageSent: { - ChainSpecificName: "CCIPMessageSent", - ReadType: evmtypes.Event, - EventDefinitions: &evmtypes.EventDefinitions{ - GenericDataWordDetails: map[string]evmtypes.DataWordDetail{ - consts.EventAttributeSourceChain: {Name: "message.header.sourceChainSelector"}, - consts.EventAttributeDestChain: {Name: "message.header.destChainSelector"}, - consts.EventAttributeSequenceNumber: {Name: "message.header.sequenceNumber"}, - }, - }, - OutputModifications: codec.ModifiersConfig{ - &codec.WrapperModifierConfig{Fields: map[string]string{ - "Message.FeeTokenAmount": "Int", - "Message.FeeValueJuels": "Int", - "Message.TokenAmounts.Amount": "Int", - }}, - }, - }, - }, - }, - }, - } - - sb, auth := setupSimulatedBackendAndAuth(t) - s := testSetup(ctx, t, testSetupParams{ - ReaderChain: chainS1, - DestChain: chainD, - OnChainSeqNums: nil, - Cfg: cfg, - ToBindContracts: nil, - ToMockBindings: nil, - BindTester: true, - SimulatedBackend: sb, - Auth: auth, - }) - + s := setupMsgsBetweenSeqNumsTest(ctx, t, false) _, err := s.contract.EmitCCIPMessageSent(s.auth, uint64(chainD), ccip_reader_tester.InternalEVM2AnyRampMessage{ Header: ccip_reader_tester.InternalRampMessageHeader{ MessageId: [32]byte{1, 0, 0, 0, 0}, @@ -411,7 +363,7 @@ func TestCCIPReader_MsgsBetweenSeqNums(t *testing.T) { FeeValueJuels: big.NewInt(2), TokenAmounts: []ccip_reader_tester.InternalEVM2AnyTokenTransfer{{Amount: big.NewInt(1)}, {Amount: big.NewInt(2)}}, }) - assert.NoError(t, err) + require.NoError(t, err) _, err = s.contract.EmitCCIPMessageSent(s.auth, uint64(chainD), ccip_reader_tester.InternalEVM2AnyRampMessage{ Header: ccip_reader_tester.InternalRampMessageHeader{ @@ -429,7 +381,7 @@ func TestCCIPReader_MsgsBetweenSeqNums(t *testing.T) { FeeValueJuels: big.NewInt(4), TokenAmounts: []ccip_reader_tester.InternalEVM2AnyTokenTransfer{{Amount: big.NewInt(3)}, {Amount: big.NewInt(4)}}, }) - assert.NoError(t, err) + require.NoError(t, err) s.sb.Commit() @@ -497,19 +449,20 @@ func TestCCIPReader_NextSeqNum(t *testing.T) { sb, auth := setupSimulatedBackendAndAuth(t) s := testSetup(ctx, t, testSetupParams{ - ReaderChain: chainD, - DestChain: chainD, - OnChainSeqNums: onChainSeqNums, - Cfg: cfg, - ToBindContracts: nil, - ToMockBindings: nil, - BindTester: true, - SimulatedBackend: sb, - Auth: auth, + ReaderChain: chainD, + DestChain: chainD, + OnChainSeqNums: onChainSeqNums, + Cfg: cfg, + ToBindContracts: nil, + ToMockBindings: nil, + BindTester: true, + ContractNameToBind: consts.ContractNameOffRamp, + SimulatedBackend: sb, + Auth: auth, }) seqNums, err := s.reader.NextSeqNum(ctx, []cciptypes.ChainSelector{chainS1, chainS2, chainS3}) - assert.NoError(t, err) + require.NoError(t, err) assert.Len(t, seqNums, 3) assert.Equal(t, cciptypes.SeqNum(10), seqNums[0]) assert.Equal(t, cciptypes.SeqNum(20), seqNums[1]) @@ -597,19 +550,20 @@ func TestCCIPReader_Nonces(t *testing.T) { sb, auth := setupSimulatedBackendAndAuth(t) s := testSetup(ctx, t, testSetupParams{ - ReaderChain: chainD, - DestChain: chainD, - Cfg: cfg, - BindTester: true, - SimulatedBackend: sb, - Auth: auth, + ReaderChain: chainD, + DestChain: chainD, + Cfg: cfg, + BindTester: true, + ContractNameToBind: consts.ContractNameNonceManager, + SimulatedBackend: sb, + Auth: auth, }) // Add some nonces. for chain, addrs := range nonces { for addr, nonce := range addrs { _, err := s.contract.SetInboundNonce(s.auth, uint64(chain), nonce, common.LeftPadBytes(addr.Bytes(), 32)) - assert.NoError(t, err) + require.NoError(t, err) } } s.sb.Commit() @@ -622,7 +576,7 @@ func TestCCIPReader_Nonces(t *testing.T) { addrQuery = append(addrQuery, utils.RandomAddress().String()) results, err := s.reader.Nonces(ctx, sourceChain, chainD, addrQuery) - assert.NoError(t, err) + require.NoError(t, err) assert.Len(t, results, len(addrQuery)) for addr, nonce := range addrs { assert.Equal(t, nonce, results[addr.String()]) @@ -836,7 +790,438 @@ func Test_GetWrappedNativeTokenPriceUSD(t *testing.T) { require.Equal(t, changeset.DefaultInitialPrices.WethPrice, prices[cciptypes.ChainSelector(chain1)].Int) } -func setupSimulatedBackendAndAuth(t *testing.T) (*simulated.Backend, *bind.TransactOpts) { +// Benchmark Results: +// Benchmark_CCIPReader_CommitReportsGTETimestamp/FirstLogs_0_MatchLogs_0-14 16948 67728 ns/op 30387 B/op 417 allocs/op +// Benchmark_CCIPReader_CommitReportsGTETimestamp/FirstLogs_1_MatchLogs_10-14 1650 741741 ns/op 528334 B/op 9929 allocs/op +// Benchmark_CCIPReader_CommitReportsGTETimestamp/FirstLogs_10_MatchLogs_100-14 195 6096328 ns/op 4739856 B/op 92345 allocs/op +// Benchmark_CCIPReader_CommitReportsGTETimestamp/FirstLogs_100_MatchLogs_10000-14 2 582712583 ns/op 454375304 B/op 8931990 allocs/op +func Benchmark_CCIPReader_CommitReportsGTETimestamp(b *testing.B) { + tests := []struct { + logsInsertedFirst int + logsInsertedMatching int + }{ + {0, 0}, + {1, 10}, + {10, 100}, + {100, 10_000}, + } + + for _, tt := range tests { + b.Run(fmt.Sprintf("FirstLogs_%d_MatchLogs_%d", tt.logsInsertedMatching, tt.logsInsertedFirst), func(b *testing.B) { + benchmarkCommitReports(b, tt.logsInsertedFirst, tt.logsInsertedMatching) + }) + } +} + +func benchmarkCommitReports(b *testing.B, logsInsertedFirst int, logsInsertedMatching int) { + // Initialize test setup + ctx := tests.Context(b) + s, _, _ := setupGetCommitGTETimestampTest(ctx, b, 0, true) + + if logsInsertedFirst > 0 { + populateDatabaseForCommitReportAccepted(ctx, b, s, chainD, chainS1, logsInsertedFirst, 0) + } + + queryTimestamp := time.Now() + + if logsInsertedMatching > 0 { + populateDatabaseForCommitReportAccepted(ctx, b, s, chainD, chainS1, logsInsertedMatching, logsInsertedFirst) + } + + // Reset timer to measure only the query time + b.ResetTimer() + + for i := 0; i < b.N; i++ { + reports, err := s.reader.CommitReportsGTETimestamp(ctx, chainD, queryTimestamp, logsInsertedFirst) + require.NoError(b, err) + require.Len(b, reports, logsInsertedFirst) + } +} + +func populateDatabaseForCommitReportAccepted( + ctx context.Context, + b *testing.B, + testEnv *testSetupData, + destChain cciptypes.ChainSelector, + sourceChain cciptypes.ChainSelector, + numOfReports int, + offset int, +) { + var logs []logpoller.Log + commitReportEvent, exists := offrampABI.Events[consts.EventNameCommitReportAccepted] + require.True(b, exists, "Event CommitReportAccepted not found in ABI") + + commitReportEventSig := commitReportEvent.ID + commitReportAddress := testEnv.contractAddr + + // Calculate timestamp based on whether these are the first logs or matching logs + var timestamp time.Time + if offset == 0 { + // For first set of logs, set timestamp to 1 hour ago + timestamp = time.Now().Add(-1 * time.Hour) + } else { + // For matching logs, use current time + timestamp = time.Now() + } + + for i := 0; i < numOfReports; i++ { + // Calculate unique BlockNumber and LogIndex + blockNumber := int64(offset + i + 1) // Offset ensures unique block numbers + logIndex := int64(offset + i + 1) // Offset ensures unique log indices + + // Simulate merkleRoots + merkleRoots := []offramp.InternalMerkleRoot{ + { + SourceChainSelector: uint64(sourceChain), + OnRampAddress: utils.RandomAddress().Bytes(), + // #nosec G115 + MinSeqNr: uint64(i * 100), + // #nosec G115 + MaxSeqNr: uint64(i*100 + 99), + MerkleRoot: utils.RandomBytes32(), + }, + } + + sourceToken := utils.RandomAddress() + + // Simulate priceUpdates + priceUpdates := offramp.InternalPriceUpdates{ + TokenPriceUpdates: []offramp.InternalTokenPriceUpdate{ + {SourceToken: sourceToken, UsdPerToken: big.NewInt(8)}, + }, + GasPriceUpdates: []offramp.InternalGasPriceUpdate{ + {DestChainSelector: uint64(1), UsdPerUnitGas: big.NewInt(10)}, + }, + } + + // Combine encoded data + encodedData, err := commitReportEvent.Inputs.Pack(merkleRoots, priceUpdates) + require.NoError(b, err) + + // Topics (first one is the event signature) + topics := [][]byte{ + commitReportEventSig[:], + } + + // Create log entry + logs = append(logs, logpoller.Log{ + EvmChainId: ubig.New(new(big.Int).SetUint64(uint64(destChain))), + LogIndex: logIndex, + BlockHash: utils.NewHash(), + BlockNumber: blockNumber, + BlockTimestamp: timestamp, + EventSig: commitReportEventSig, + Topics: topics, + Address: commitReportAddress, + TxHash: utils.NewHash(), + Data: encodedData, + CreatedAt: time.Now(), + }) + } + + // Insert logs into the database + require.NoError(b, testEnv.orm.InsertLogs(ctx, logs)) + require.NoError(b, testEnv.orm.InsertBlock(ctx, utils.RandomHash(), int64(offset+numOfReports), timestamp, int64(offset+numOfReports))) +} + +// Benchmark Results: +// Benchmark_CCIPReader_ExecutedMessageRanges/LogsInserted_0_StartSeq_0_EndSeq_10-14 13599 93414 ns/op 43389 B/op 654 allocs/op +// Benchmark_CCIPReader_ExecutedMessageRanges/LogsInserted_10_StartSeq_10_EndSeq_20-14 13471 88392 ns/op 43011 B/op 651 allocs/op +// Benchmark_CCIPReader_ExecutedMessageRanges/LogsInserted_10_StartSeq_0_EndSeq_9-14 2799 473396 ns/op 303737 B/op 4535 allocs/op +// Benchmark_CCIPReader_ExecutedMessageRanges/LogsInserted_100_StartSeq_0_EndSeq_100-14 438 2724414 ns/op 2477573 B/op 37468 allocs/op +// Benchmark_CCIPReader_ExecutedMessageRanges/LogsInserted_100000_StartSeq_99744_EndSeq_100000-14 40 29118796 ns/op 12607995 B/op 179396 allocs/op +func Benchmark_CCIPReader_ExecutedMessageRanges(b *testing.B) { + tests := []struct { + logsInserted int + startSeqNum cciptypes.SeqNum + endSeqNum cciptypes.SeqNum + }{ + {0, 0, 10}, // no logs + {10, 10, 20}, // out of bounds + {10, 0, 9}, // get all messages with 10 logs + {100, 0, 100}, // get all messages with 100 logs + {100_000, 100_000 - 256, 100_000}, // get the last 256 messages + } + + for _, tt := range tests { + b.Run(fmt.Sprintf("LogsInserted_%d_StartSeq_%d_EndSeq_%d", tt.logsInserted, tt.startSeqNum, tt.endSeqNum), func(b *testing.B) { + benchmarkExecutedMessageRanges(b, tt.logsInserted, tt.startSeqNum, tt.endSeqNum) + }) + } +} + +func benchmarkExecutedMessageRanges(b *testing.B, logsInsertedFirst int, startSeqNum, endSeqNum cciptypes.SeqNum) { + // Initialize test setup + ctx := tests.Context(b) + s := setupExecutedMessageRangesTest(ctx, b, true) + expectedRangeLen := calculateExpectedRangeLen(logsInsertedFirst, startSeqNum, endSeqNum) + + // Insert logs in two phases based on parameters + if logsInsertedFirst > 0 { + populateDatabaseForExecutionStateChanged(ctx, b, s, chainS1, chainD, logsInsertedFirst, 0) + } + + // Reset timer to measure only the query time + b.ResetTimer() + + for i := 0; i < b.N; i++ { + executedRanges, err := s.reader.ExecutedMessageRanges( + ctx, + chainS1, + chainD, + cciptypes.NewSeqNumRange(startSeqNum, endSeqNum), + ) + require.NoError(b, err) + require.Len(b, executedRanges, expectedRangeLen) + } +} + +func populateDatabaseForExecutionStateChanged( + ctx context.Context, + b *testing.B, + testEnv *testSetupData, + sourceChain cciptypes.ChainSelector, + destChain cciptypes.ChainSelector, + numOfEvents int, + offset int, +) { + var logs []logpoller.Log + executionStateEvent, exists := offrampABI.Events[consts.EventNameExecutionStateChanged] + require.True(b, exists, "Event ExecutionStateChanged not found in ABI") + + executionStateEventSig := executionStateEvent.ID + executionStateEventAddress := testEnv.contractAddr + + for i := 0; i < numOfEvents; i++ { + // Calculate unique BlockNumber and LogIndex + blockNumber := int64(offset + i + 1) // Offset ensures unique block numbers + logIndex := int64(offset + i + 1) // Offset ensures unique log indices + + // Populate fields for the event + sourceChainSelector := uint64(sourceChain) + // #nosec G115 + sequenceNumber := uint64(offset + i) + messageID := utils.NewHash() + messageHash := utils.NewHash() + state := uint8(1) + returnData := []byte{0x01, 0x02} + gasUsed := big.NewInt(int64(10000 + i)) + + // Encode the non indexed event data + encodedData, err := executionStateEvent.Inputs.NonIndexed().Pack( + messageHash, + state, + returnData, + gasUsed, + ) + require.NoError(b, err) + + // Topics (event signature and indexed fields) + topics := [][]byte{ + executionStateEventSig[:], // Event signature + logpoller.EvmWord(sourceChainSelector).Bytes(), // Indexed sourceChainSelector + logpoller.EvmWord(sequenceNumber).Bytes(), // Indexed sequenceNumber + messageID[:], // Indexed messageId + } + + // Create log entry + logs = append(logs, logpoller.Log{ + EvmChainId: ubig.New(big.NewInt(0).SetUint64(uint64(destChain))), + LogIndex: logIndex, + BlockHash: utils.NewHash(), + BlockNumber: blockNumber, + BlockTimestamp: time.Now(), + EventSig: executionStateEventSig, + Topics: topics, + Address: executionStateEventAddress, + TxHash: utils.NewHash(), + Data: encodedData, + CreatedAt: time.Now(), + }) + } + + // Insert logs into the database + require.NoError(b, testEnv.orm.InsertLogs(ctx, logs)) + require.NoError(b, testEnv.orm.InsertBlock(ctx, utils.RandomHash(), int64(offset+numOfEvents), time.Now(), int64(offset+numOfEvents))) +} + +// Benchmark Results: +// Benchmark_CCIPReader_MessageSentRanges/LogsInserted_0_StartSeq_0_EndSeq_10-14 13729 85838 ns/op 43473 B/op 647 allocs/op +// Benchmark_CCIPReader_MessageSentRanges/LogsInserted_10_StartSeq_0_EndSeq_9-14 870 1405208 ns/op 1156315 B/op 21102 allocs/op +// Benchmark_CCIPReader_MessageSentRanges/LogsInserted_100_StartSeq_0_EndSeq_100-14 90 12129488 ns/op 10833395 B/op 201076 allocs/op +// Benchmark_CCIPReader_MessageSentRanges/LogsInserted_100000_StartSeq_99744_EndSeq_100000-14 10 105741438 ns/op 49103282 B/op 796213 allocs/op +func Benchmark_CCIPReader_MessageSentRanges(b *testing.B) { + tests := []struct { + logsInserted int + startSeqNum cciptypes.SeqNum + endSeqNum cciptypes.SeqNum + }{ + {0, 0, 10}, // No logs + {10, 0, 9}, // Get all messages with 10 logs + {100, 0, 100}, // Get all messages with 100 logs + {100_000, 100_000 - 256, 100_000}, // Get the last 256 messages + } + + for _, tt := range tests { + b.Run(fmt.Sprintf("LogsInserted_%d_StartSeq_%d_EndSeq_%d", tt.logsInserted, tt.startSeqNum, tt.endSeqNum), func(b *testing.B) { + benchmarkMessageSentRanges(b, tt.logsInserted, tt.startSeqNum, tt.endSeqNum) + }) + } +} + +func benchmarkMessageSentRanges(b *testing.B, logsInserted int, startSeqNum, endSeqNum cciptypes.SeqNum) { + // Initialize test setup + ctx := tests.Context(b) + s := setupMsgsBetweenSeqNumsTest(ctx, b, true) + expectedRangeLen := calculateExpectedRangeLen(logsInserted, startSeqNum, endSeqNum) + + err := s.extendedCR.Bind(ctx, []types.BoundContract{ + { + Address: s.contractAddr.String(), + Name: consts.ContractNameOnRamp, + }, + }) + require.NoError(b, err) + + // Insert logs if needed + if logsInserted > 0 { + populateDatabaseForMessageSent(ctx, b, s, chainS1, chainD, logsInserted, 0) + } + + // Reset timer to measure only the query time + b.ResetTimer() + + for i := 0; i < b.N; i++ { + msgs, err := s.reader.MsgsBetweenSeqNums( + ctx, + chainS1, + cciptypes.NewSeqNumRange(startSeqNum, endSeqNum), + ) + require.NoError(b, err) + require.Len(b, msgs, expectedRangeLen) + } +} + +func populateDatabaseForMessageSent( + ctx context.Context, + b *testing.B, + testEnv *testSetupData, + sourceChain cciptypes.ChainSelector, + destChain cciptypes.ChainSelector, + numOfEvents int, + offset int, +) { + var logs []logpoller.Log + messageSentEvent, exists := onrampABI.Events[consts.EventNameCCIPMessageSent] + require.True(b, exists, "Event CCIPMessageSent not found in ABI") + + messageSentEventSig := messageSentEvent.ID + messageSentEventAddress := testEnv.contractAddr + + for i := 0; i < numOfEvents; i++ { + // Calculate unique BlockNumber and LogIndex + blockNumber := int64(offset + i + 1) // Offset ensures unique block numbers + logIndex := int64(offset + i + 1) // Offset ensures unique log indices + + // Populate fields for the event + destChainSelector := uint64(destChain) + // #nosec G115 + sequenceNumber := uint64(offset + i) + + // Create InternalRampMessageHeader struct + header := onramp.InternalRampMessageHeader{ + MessageId: utils.NewHash(), + SourceChainSelector: uint64(sourceChain), + DestChainSelector: destChainSelector, + SequenceNumber: sequenceNumber, + // #nosec G115 + Nonce: uint64(i), + } + + // Create InternalEVM2AnyTokenTransfer slice + tokenTransfers := []onramp.InternalEVM2AnyTokenTransfer{ + { + SourcePoolAddress: utils.RandomAddress(), + DestTokenAddress: []byte{0x01, 0x02}, + ExtraData: []byte{0x03}, + // #nosec G115 + Amount: big.NewInt(1000 + int64(i)), + DestExecData: []byte{}, + }, + } + + // Create InternalEVM2AnyRampMessage struct + message := onramp.InternalEVM2AnyRampMessage{ + Header: header, + Sender: utils.RandomAddress(), + Data: []byte{0x04, 0x05}, + Receiver: []byte{0x06, 0x07}, + ExtraArgs: []byte{0x08}, + FeeToken: utils.RandomAddress(), + // #nosec G115 + FeeTokenAmount: big.NewInt(2000 + int64(i)), + // #nosec G115 + + FeeValueJuels: big.NewInt(3000 + int64(i)), + TokenAmounts: tokenTransfers, + } + + // Encode the non-indexed event data + encodedData, err := messageSentEvent.Inputs.NonIndexed().Pack( + message, + ) + require.NoError(b, err) + + // Topics (event signature and indexed fields) + topics := [][]byte{ + messageSentEventSig[:], // Event signature + logpoller.EvmWord(destChainSelector).Bytes(), // Indexed destChainSelector + logpoller.EvmWord(sequenceNumber).Bytes(), // Indexed sequenceNumber + } + + // Create log entry + logs = append(logs, logpoller.Log{ + EvmChainId: ubig.New(big.NewInt(0).SetUint64(uint64(sourceChain))), + LogIndex: logIndex, + BlockHash: utils.NewHash(), + BlockNumber: blockNumber, + BlockTimestamp: time.Now(), + EventSig: messageSentEventSig, + Topics: topics, + Address: messageSentEventAddress, + TxHash: utils.NewHash(), + Data: encodedData, + CreatedAt: time.Now(), + }) + } + + // Insert logs into the database + require.NoError(b, testEnv.orm.InsertLogs(ctx, logs)) + require.NoError(b, testEnv.orm.InsertBlock(ctx, utils.RandomHash(), int64(offset+numOfEvents), time.Now(), int64(offset+numOfEvents))) +} + +func calculateExpectedRangeLen(logsInserted int, startSeq, endSeq cciptypes.SeqNum) int { + if logsInserted == 0 { + return 0 + } + start := uint64(startSeq) + end := uint64(endSeq) + // #nosec G115 + logs := uint64(logsInserted) + + if start >= logs { + return 0 + } + + if end >= logs { + end = logs - 1 + } + + // #nosec G115 + return int(end - start + 1) +} + +func setupSimulatedBackendAndAuth(t testing.TB) (*simulated.Backend, *bind.TransactOpts) { privateKey, err := crypto.GenerateKey() require.NoError(t, err) @@ -933,7 +1318,7 @@ func testSetupRealContracts( func testSetup( ctx context.Context, - t *testing.T, + t testing.TB, params testSetupParams, ) *testSetupData { address, _, _, err := ccip_reader_tester.DeployCCIPReaderTester(params.Auth, params.SimulatedBackend.Client()) @@ -946,7 +1331,13 @@ func testSetup( lggr := logger.TestLogger(t) lggr.SetLogLevel(zapcore.ErrorLevel) - db := pgtest.NewSqlxDB(t) + // Parameterize database selection + var db *sqlx.DB + if params.UseHeavyDB { + _, db = heavyweight.FullTestDBV2(t, nil) // Heavyweight database for benchmarks + } else { + db = pgtest.NewSqlxDB(t) // Simple in-memory DB for tests + } lpOpts := logpoller.Opts{ PollPeriod: time.Millisecond, FinalityDepth: params.FinalityDepth, @@ -956,7 +1347,9 @@ func testSetup( } cl := client.NewSimulatedBackendClient(t, params.SimulatedBackend, big.NewInt(0).SetUint64(uint64(params.ReaderChain))) headTracker := headtracker.NewSimulatedHeadTracker(cl, lpOpts.UseFinalityTag, lpOpts.FinalityDepth) - lp := logpoller.NewLogPoller(logpoller.NewORM(big.NewInt(0).SetUint64(uint64(params.ReaderChain)), db, lggr), + orm := logpoller.NewORM(big.NewInt(0).SetUint64(uint64(params.ReaderChain)), db, lggr) + lp := logpoller.NewLogPoller( + orm, cl, lggr, headTracker, @@ -977,8 +1370,6 @@ func testSetup( assert.Equal(t, seqNum, cciptypes.SeqNum(scc.MinSeqNr)) } - contractNames := maps.Keys(params.Cfg.Contracts) - cr, err := evm.NewChainReaderService(ctx, lggr, lp, headTracker, cl, params.Cfg) require.NoError(t, err) @@ -988,7 +1379,7 @@ func testSetup( err = extendedCr.Bind(ctx, []types.BoundContract{ { Address: address.String(), - Name: contractNames[0], + Name: params.ContractNameToBind, }, }) require.NoError(t, err) @@ -1048,6 +1439,7 @@ func testSetup( contract: contract, sb: params.SimulatedBackend, auth: params.Auth, + orm: orm, lp: lp, cl: cl, reader: reader, @@ -1056,16 +1448,18 @@ func testSetup( } type testSetupParams struct { - ReaderChain cciptypes.ChainSelector - DestChain cciptypes.ChainSelector - OnChainSeqNums map[cciptypes.ChainSelector]cciptypes.SeqNum - Cfg evmtypes.ChainReaderConfig - ToBindContracts map[cciptypes.ChainSelector][]types.BoundContract - ToMockBindings map[cciptypes.ChainSelector][]types.BoundContract - BindTester bool - SimulatedBackend *simulated.Backend - Auth *bind.TransactOpts - FinalityDepth int64 + ReaderChain cciptypes.ChainSelector + DestChain cciptypes.ChainSelector + OnChainSeqNums map[cciptypes.ChainSelector]cciptypes.SeqNum + Cfg evmtypes.ChainReaderConfig + ToBindContracts map[cciptypes.ChainSelector][]types.BoundContract + ToMockBindings map[cciptypes.ChainSelector][]types.BoundContract + BindTester bool + ContractNameToBind string + SimulatedBackend *simulated.Backend + Auth *bind.TransactOpts + FinalityDepth int64 + UseHeavyDB bool } type testSetupData struct { @@ -1073,6 +1467,7 @@ type testSetupData struct { contract *ccip_reader_tester.CCIPReaderTester sb *simulated.Backend auth *bind.TransactOpts + orm logpoller.ORM lp logpoller.LogPoller cl client.Client reader ccipreaderpkg.CCIPReader