From d91324727f390ba3fd45e0d3e09421ad616ce04f Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Mon, 22 Jul 2024 23:09:28 -0700 Subject: [PATCH] refactor: implement a maximum upperbound on parallel http requests to the RPC node Currently we allow defining RPCConcurrency, but that is in reality the BLOCK CONCURRENCY, the number of blocks we fetch in parallel. for OP-Stack EVMs, we issue, for each block, in parallel, 3 RPC requests to the RPC Node. So, the real concurrency is something like: 3 * (RPCConcurrency + DLQRPCConcurrency) This patch enforces a maximum concurrency by using on rpcClient an internal workerPool. This uses the github.com/panjf2000/ants/v2 library Now the maximum total RPC concurrency is defined as 4 x RPCConcurrency --- client/jsonrpc/client.go | 19 ++++++++++++++++--- client/jsonrpc/models.go | 4 ++++ client/jsonrpc/opstack.go | 25 ++++++++++++++++--------- cmd/main.go | 7 +++++-- config/config.go | 11 ++++++----- go.mod | 1 + go.sum | 11 +++++++++++ 7 files changed, 59 insertions(+), 19 deletions(-) diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index ef2d1fd..c934d6c 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -13,6 +13,7 @@ import ( "github.com/duneanalytics/blockchain-ingester/lib/hexutils" "github.com/duneanalytics/blockchain-ingester/models" "github.com/hashicorp/go-retryablehttp" + "github.com/panjf2000/ants/v2" ) type BlockchainClient interface { @@ -22,8 +23,9 @@ type BlockchainClient interface { } const ( - MaxRetries = 10 - DefaultRequestTimeout = 30 * time.Second + MaxRetries = 10 + DefaultRequestTimeout = 30 * time.Second + DefaultMaxRPCConcurrency = 50 // safe default ) type rpcClient struct { @@ -32,6 +34,7 @@ type rpcClient struct { log *slog.Logger bufPool *sync.Pool httpHeaders map[string]string + wrkPool *ants.Pool } func NewClient(logger *slog.Logger, cfg Config) (BlockchainClient, error) { @@ -64,6 +67,14 @@ func newClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis client.Backoff = retryablehttp.LinearJitterBackoff client.HTTPClient.Timeout = DefaultRequestTimeout + if cfg.TotalRPCConcurrency == 0 { + cfg.TotalRPCConcurrency = DefaultMaxRPCConcurrency + } + wkrPool, err := ants.NewPool(cfg.TotalRPCConcurrency) + if err != nil { + return nil, fmt.Errorf("failed to create worker pool: %w", err) + } + rpc := &rpcClient{ client: client, cfg: cfg, @@ -74,9 +85,10 @@ func newClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis }, }, httpHeaders: cfg.HTTPHeaders, + wrkPool: wkrPool, } // Ensure RPC node is up & reachable - _, err := rpc.LatestBlockNumber() + _, err = rpc.LatestBlockNumber() if err != nil { return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err) } @@ -152,5 +164,6 @@ func (c *rpcClient) getResponseBody( } func (c *rpcClient) Close() error { + c.wrkPool.Release() return nil } diff --git a/client/jsonrpc/models.go b/client/jsonrpc/models.go index df8f878..8517ea5 100644 --- a/client/jsonrpc/models.go +++ b/client/jsonrpc/models.go @@ -11,4 +11,8 @@ type Config struct { PollInterval time.Duration HTTPHeaders map[string]string EVMStack models.EVMStack + // rpcClient is used in parallel by the ingester to fetch blocks + // but it also has internal request concurrency on handling each block + // to avoid spawning too many http requests to the RPC node we set here an upper limit + TotalRPCConcurrency int } diff --git a/client/jsonrpc/opstack.go b/client/jsonrpc/opstack.go index 9f76655..399c395 100644 --- a/client/jsonrpc/opstack.go +++ b/client/jsonrpc/opstack.go @@ -57,17 +57,24 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m for i, method := range methods { results[i] = c.bufPool.Get().(*bytes.Buffer) defer c.bufPool.Put(results[i]) + results[i].Reset() group.Go(func() error { - results[i].Reset() - err := c.getResponseBody(ctx, method, methodArgs[method], results[i]) - if err != nil { - c.log.Error("Failed to get response for jsonRPC", - "method", method, - "error", err, - ) - } - return err + errCh := make(chan error, 1) + c.wrkPool.Submit(func() { + defer close(errCh) + err := c.getResponseBody(ctx, method, methodArgs[method], results[i]) + if err != nil { + c.log.Error("Failed to get response for jsonRPC", + "method", method, + "error", err, + ) + errCh <- err + } else { + errCh <- nil + } + }) + return <-errCh }) } diff --git a/cmd/main.go b/cmd/main.go index c648542..8f3268f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -93,6 +93,9 @@ func main() { URL: cfg.RPCNode.NodeURL, HTTPHeaders: rpcHTTPHeaders, EVMStack: cfg.RPCStack, + // real max request concurrency to RPP node + // each block requires multiple RPC requests + TotalRPCConcurrency: cfg.BlockConcurrency * 4, }) if err != nil { stdlog.Fatal(err) @@ -142,8 +145,8 @@ func main() { duneClient, duneClientDLQ, ingester.Config{ - MaxConcurrentRequests: cfg.RPCConcurrency, - MaxConcurrentRequestsDLQ: cfg.DLQConcurrency, + MaxConcurrentRequests: cfg.BlockConcurrency, + MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency, ReportProgressInterval: cfg.ReportProgressInterval, PollInterval: cfg.PollInterval, PollDLQInterval: cfg.PollDLQInterval, diff --git a/config/config.go b/config/config.go index e50cf04..8216d72 100644 --- a/config/config.go +++ b/config/config.go @@ -54,11 +54,12 @@ type Config struct { DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient - RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll - RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"25"` // nolint:lll - DLQConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent requests to the RPC node for DLQ processing" default:"2"` // nolint:lll - BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll - LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll + RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll + // kept the old cmdline arg names and env variables for backwards compatibility + BlockConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent block requests to the RPC node" default:"25"` // nolint:lll + DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll + BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll + LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll } func (c Config) HasError() error { diff --git a/go.mod b/go.mod index eed1f14..9f14f3b 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.7 github.com/jessevdk/go-flags v1.5.0 github.com/klauspost/compress v1.17.8 + github.com/panjf2000/ants/v2 v2.10.0 github.com/prometheus/client_golang v1.19.1 github.com/stretchr/testify v1.9.0 golang.org/x/sync v0.7.0 diff --git a/go.sum b/go.sum index d9a2477..e06ec98 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= @@ -31,6 +32,8 @@ github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxec github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1p+8= +github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= @@ -43,8 +46,15 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -55,5 +65,6 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=