Skip to content

Commit

Permalink
refactor: implement a maximum upperbound on parallel http requests to…
Browse files Browse the repository at this point in the history
… the RPC node

Currently we allow defining RPCConcurrency, but that is in reality the
BLOCK CONCURRENCY, the number of blocks we fetch in parallel.

for OP-Stack EVMs, we issue, for each block, in parallel, 3 RPC requests
to the RPC Node.
So, the real concurrency is something like:
3 * (RPCConcurrency + DLQRPCConcurrency)

This patch enforces a maximum concurrency by using on rpcClient an
internal workerPool.
This uses the github.com/panjf2000/ants/v2 library

Now the maximum total RPC concurrency is defined as 4 x RPCConcurrency
  • Loading branch information
msf committed Jul 23, 2024
1 parent e491b73 commit 685f2b0
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 19 deletions.
19 changes: 16 additions & 3 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/duneanalytics/blockchain-ingester/lib/hexutils"
"github.com/duneanalytics/blockchain-ingester/models"
"github.com/hashicorp/go-retryablehttp"
"github.com/panjf2000/ants/v2"
)

type BlockchainClient interface {
Expand All @@ -22,8 +23,9 @@ type BlockchainClient interface {
}

const (
MaxRetries = 10
DefaultRequestTimeout = 30 * time.Second
MaxRetries = 10
DefaultRequestTimeout = 30 * time.Second
DefaultMaxRPCConcurrency = 50 // safe default
)

type rpcClient struct {
Expand All @@ -32,6 +34,7 @@ type rpcClient struct {
log *slog.Logger
bufPool *sync.Pool
httpHeaders map[string]string
wrkPool *ants.Pool
}

func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:disable-line:unexported-return
Expand All @@ -53,6 +56,14 @@ func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis
client.Backoff = retryablehttp.LinearJitterBackoff
client.HTTPClient.Timeout = DefaultRequestTimeout

if cfg.TotalRPCConcurrency == 0 {
cfg.TotalRPCConcurrency = DefaultMaxRPCConcurrency
}
wkrPool, err := ants.NewPool(cfg.TotalRPCConcurrency)
if err != nil {
return nil, fmt.Errorf("failed to create worker pool: %w", err)
}

rpc := &rpcClient{
client: client,
cfg: cfg,
Expand All @@ -63,9 +74,10 @@ func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis
},
},
httpHeaders: cfg.HTTPHeaders,
wrkPool: wkrPool,
}
// Ensure RPC node is up & reachable
_, err := rpc.LatestBlockNumber()
_, err = rpc.LatestBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err)
}
Expand Down Expand Up @@ -136,5 +148,6 @@ func (c *rpcClient) getResponseBody(
}

func (c *rpcClient) Close() error {
c.wrkPool.Release()
return nil
}
4 changes: 4 additions & 0 deletions client/jsonrpc/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ type Config struct {
URL string
PollInterval time.Duration
HTTPHeaders map[string]string
// rpcClient is used in parallel by the ingester to fetch blocks
// but it also has internal request concurrency on handling each block
// to avoid spawning too many http requests to the RPC node we set here an upper limit
TotalRPCConcurrency int
}
25 changes: 16 additions & 9 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,24 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
for i, method := range methods {
results[i] = c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(results[i])
results[i].Reset()

group.Go(func() error {
results[i].Reset()
err := c.getResponseBody(ctx, method, methodArgs[method], results[i])
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"method", method,
"error", err,
)
}
return err
errCh := make(chan error, 1)
c.wrkPool.Submit(func() {
defer close(errCh)
err := c.getResponseBody(ctx, method, methodArgs[method], results[i])
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"method", method,
"error", err,
)
errCh <- err
} else {
errCh <- nil
}
})
return <-errCh
})
}

Expand Down
7 changes: 5 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func main() {
rpcClient, err = jsonrpc.NewOpStackClient(logger, jsonrpc.Config{
URL: cfg.RPCNode.NodeURL,
HTTPHeaders: rpcHTTPHeaders,
// real max request concurrency to RPP node
// each block requires multiple RPC requests
TotalRPCConcurrency: cfg.BlockConcurrency * 4,
})
default:
stdlog.Fatalf("unsupported RPC stack: %s", cfg.RPCStack)
Expand Down Expand Up @@ -133,8 +136,8 @@ func main() {
duneClient,
duneClientDLQ,
ingester.Config{
MaxConcurrentRequests: cfg.RPCConcurrency,
MaxConcurrentRequestsDLQ: cfg.DLQConcurrency,
MaxConcurrentRequests: cfg.BlockConcurrency,
MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Expand Down
11 changes: 6 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ type Config struct {
DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"25"` // nolint:lll
DLQConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
// kept the old cmdline arg names and env variables for backwards compatibility
BlockConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent block requests to the RPC node" default:"25"` // nolint:lll
DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jessevdk/go-flags v1.5.0
github.com/klauspost/compress v1.17.8
github.com/panjf2000/ants/v2 v2.10.0
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.7.0
)
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
Expand All @@ -20,16 +21,26 @@ github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxec
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1p+8=
github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 comments on commit 685f2b0

Please sign in to comment.