Skip to content

Commit

Permalink
[TT-688] Batch Keeper Benchmark Read Requests (#11294)
Browse files Browse the repository at this point in the history
* Batch Keeper Benchmark Read Requests

* Actually add number
  • Loading branch information
kalverra authored Nov 16, 2023
1 parent 26e5e37 commit 6fd7be8
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions integration-tests/testsetups/keeper_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,32 +293,54 @@ func (k *KeeperBenchmarkTest) Run() {
require.NoError(k.t, err, "Error waiting for keeper subscriptions")

// Collect logs for each registry to calculate test metrics
registryLogs := make([][]types.Log, len(k.keeperRegistries))
// This test generates a LOT of logs, and we need to break up our reads, or risk getting rate-limited by the node
var (
endBlock = big.NewInt(0).Add(k.startingBlock, big.NewInt(u.BlockRange))
registryLogs = make([][]types.Log, len(k.keeperRegistries))
blockBatchSize int64 = 100
)
for rIndex := range k.keeperRegistries {
// Variables for the full registry
var (
logs []types.Log
timeout = 5 * time.Second
addr = k.keeperRegistries[rIndex].Address()
filterQuery = geth.FilterQuery{
logs []types.Log
timeout = 5 * time.Second
addr = k.keeperRegistries[rIndex].Address()
queryStartBlock = big.NewInt(0).Set(k.startingBlock)
)

// Gather logs from the registry in 100 block chunks to avoid read limits
for queryStartBlock.Cmp(endBlock) < 0 {
filterQuery := geth.FilterQuery{
Addresses: []common.Address{common.HexToAddress(addr)},
FromBlock: k.startingBlock,
FromBlock: queryStartBlock,
ToBlock: big.NewInt(0).Add(queryStartBlock, big.NewInt(blockBatchSize)),
}

// This RPC call can possibly time out or otherwise die. Failure is not an option, keep retrying to get our stats.
err = fmt.Errorf("initial error") // to ensure our for loop runs at least once
)
for err != nil { // This RPC call can possibly time out or otherwise die. Failure is not an option, keep retrying to get our stats.
ctx, cancel := context.WithTimeout(utils.TestContext(k.t), timeout)
logs, err = k.chainClient.FilterLogs(ctx, filterQuery)
cancel()
if err != nil {
k.log.Error().Err(err).
Interface("Filter Query", filterQuery).
Str("Timeout", timeout.String()).
Msg("Error getting logs from chain, trying again")
} else {
k.log.Info().Int("Log Count", len(logs)).Str("Registry Address", addr).Msg("Collected logs")
for err != nil {
ctx, cancel := context.WithTimeout(utils.TestContext(k.t), timeout)
logs, err = k.chainClient.FilterLogs(ctx, filterQuery)
cancel()
if err != nil {
k.log.Error().
Err(err).
Interface("Filter Query", filterQuery).
Str("Timeout", timeout.String()).
Msg("Error getting logs from chain, trying again")
timeout = time.Duration(math.Min(float64(timeout)*2, float64(2*time.Minute)))
continue
}
k.log.Info().
Uint64("From Block", queryStartBlock.Uint64()).
Uint64("To Block", filterQuery.ToBlock.Uint64()).
Int("Log Count", len(logs)).
Str("Registry Address", addr).
Msg("Collected logs")
queryStartBlock.Add(queryStartBlock, big.NewInt(blockBatchSize))
registryLogs[rIndex] = append(registryLogs[rIndex], logs...)
}
}
registryLogs[rIndex] = logs
}

// Count reverts and stale upkeeps
Expand Down

0 comments on commit 6fd7be8

Please sign in to comment.