diff --git a/integration-tests/testsetups/keeper_benchmark.go b/integration-tests/testsetups/keeper_benchmark.go index bb6c582c137..d9b389a9650 100644 --- a/integration-tests/testsetups/keeper_benchmark.go +++ b/integration-tests/testsetups/keeper_benchmark.go @@ -293,32 +293,54 @@ func (k *KeeperBenchmarkTest) Run() { require.NoError(k.t, err, "Error waiting for keeper subscriptions") // Collect logs for each registry to calculate test metrics - registryLogs := make([][]types.Log, len(k.keeperRegistries)) + // This test generates a LOT of logs, and we need to break up our reads, or risk getting rate-limited by the node + var ( + endBlock = big.NewInt(0).Add(k.startingBlock, big.NewInt(u.BlockRange)) + registryLogs = make([][]types.Log, len(k.keeperRegistries)) + blockBatchSize int64 = 100 + ) for rIndex := range k.keeperRegistries { + // Variables for the full registry var ( - logs []types.Log - timeout = 5 * time.Second - addr = k.keeperRegistries[rIndex].Address() - filterQuery = geth.FilterQuery{ + logs []types.Log + timeout = 5 * time.Second + addr = k.keeperRegistries[rIndex].Address() + queryStartBlock = big.NewInt(0).Set(k.startingBlock) + ) + + // Gather logs from the registry in 100 block chunks to avoid read limits + for queryStartBlock.Cmp(endBlock) < 0 { + filterQuery := geth.FilterQuery{ Addresses: []common.Address{common.HexToAddress(addr)}, - FromBlock: k.startingBlock, + FromBlock: queryStartBlock, + ToBlock: big.NewInt(0).Add(queryStartBlock, big.NewInt(blockBatchSize)), } + + // This RPC call can possibly time out or otherwise die. Failure is not an option, keep retrying to get our stats. err = fmt.Errorf("initial error") // to ensure our for loop runs at least once - ) - for err != nil { // This RPC call can possibly time out or otherwise die. Failure is not an option, keep retrying to get our stats. - ctx, cancel := context.WithTimeout(utils.TestContext(k.t), timeout) - logs, err = k.chainClient.FilterLogs(ctx, filterQuery) - cancel() - if err != nil { - k.log.Error().Err(err). - Interface("Filter Query", filterQuery). - Str("Timeout", timeout.String()). - Msg("Error getting logs from chain, trying again") - } else { - k.log.Info().Int("Log Count", len(logs)).Str("Registry Address", addr).Msg("Collected logs") + for err != nil { + ctx, cancel := context.WithTimeout(utils.TestContext(k.t), timeout) + logs, err = k.chainClient.FilterLogs(ctx, filterQuery) + cancel() + if err != nil { + k.log.Error(). + Err(err). + Interface("Filter Query", filterQuery). + Str("Timeout", timeout.String()). + Msg("Error getting logs from chain, trying again") + timeout = time.Duration(math.Min(float64(timeout)*2, float64(2*time.Minute))) + continue + } + k.log.Info(). + Uint64("From Block", queryStartBlock.Uint64()). + Uint64("To Block", filterQuery.ToBlock.Uint64()). + Int("Log Count", len(logs)). + Str("Registry Address", addr). + Msg("Collected logs") + queryStartBlock.Add(queryStartBlock, big.NewInt(blockBatchSize)) + registryLogs[rIndex] = append(registryLogs[rIndex], logs...) } } - registryLogs[rIndex] = logs } // Count reverts and stale upkeeps