diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index 2ffc1f1..86d05cf 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -13,6 +13,7 @@ import ( "github.com/duneanalytics/blockchain-ingester/lib/hexutils" "github.com/duneanalytics/blockchain-ingester/models" "github.com/hashicorp/go-retryablehttp" + "github.com/panjf2000/ants/v2" ) type BlockchainClient interface { @@ -22,8 +23,9 @@ type BlockchainClient interface { } const ( - MaxRetries = 10 - DefaultRequestTimeout = 30 * time.Second + MaxRetries = 10 + DefaultRequestTimeout = 30 * time.Second + DefaultMaxRPCConcurrency = 50 // safe default ) type rpcClient struct { @@ -32,6 +34,7 @@ type rpcClient struct { log *slog.Logger bufPool *sync.Pool httpHeaders map[string]string + wrkPool *ants.Pool } func NewClient(logger *slog.Logger, cfg Config) (BlockchainClient, error) { @@ -64,6 +67,14 @@ func newClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis client.Backoff = retryablehttp.LinearJitterBackoff client.HTTPClient.Timeout = DefaultRequestTimeout + if cfg.TotalRPCConcurrency == 0 { + cfg.TotalRPCConcurrency = DefaultMaxRPCConcurrency + } + wkrPool, err := ants.NewPool(cfg.TotalRPCConcurrency) + if err != nil { + return nil, fmt.Errorf("failed to create worker pool: %w", err) + } + rpc := &rpcClient{ client: client, cfg: cfg, @@ -74,9 +85,10 @@ func newClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis }, }, httpHeaders: cfg.HTTPHeaders, + wrkPool: wkrPool, } // Ensure RPC node is up & reachable - _, err := rpc.LatestBlockNumber() + _, err = rpc.LatestBlockNumber() if err != nil { return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err) } @@ -147,5 +159,6 @@ func (c *rpcClient) getResponseBody( } func (c *rpcClient) Close() error { + c.wrkPool.Release() return nil } diff --git a/client/jsonrpc/models.go b/client/jsonrpc/models.go index df8f878..8517ea5 100644 --- a/client/jsonrpc/models.go +++ b/client/jsonrpc/models.go @@ -11,4 +11,8 @@ type Config struct { PollInterval time.Duration HTTPHeaders map[string]string EVMStack models.EVMStack + // rpcClient is used in parallel by the ingester to fetch blocks + // but it also has internal request concurrency on handling each block + // to avoid spawning too many http requests to the RPC node we set here an upper limit + TotalRPCConcurrency int } diff --git a/client/jsonrpc/opstack.go b/client/jsonrpc/opstack.go index 9f76655..399c395 100644 --- a/client/jsonrpc/opstack.go +++ b/client/jsonrpc/opstack.go @@ -57,17 +57,24 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m for i, method := range methods { results[i] = c.bufPool.Get().(*bytes.Buffer) defer c.bufPool.Put(results[i]) + results[i].Reset() group.Go(func() error { - results[i].Reset() - err := c.getResponseBody(ctx, method, methodArgs[method], results[i]) - if err != nil { - c.log.Error("Failed to get response for jsonRPC", - "method", method, - "error", err, - ) - } - return err + errCh := make(chan error, 1) + c.wrkPool.Submit(func() { + defer close(errCh) + err := c.getResponseBody(ctx, method, methodArgs[method], results[i]) + if err != nil { + c.log.Error("Failed to get response for jsonRPC", + "method", method, + "error", err, + ) + errCh <- err + } else { + errCh <- nil + } + }) + return <-errCh }) } diff --git a/cmd/main.go b/cmd/main.go index 88a20db..e56a899 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -89,6 +89,9 @@ func main() { URL: cfg.RPCNode.NodeURL, HTTPHeaders: rpcHTTPHeaders, EVMStack: cfg.RPCStack, + // real max request concurrency to RPP node + // each block requires multiple RPC requests + TotalRPCConcurrency: cfg.BlockConcurrency * 4, }) if err != nil { stdlog.Fatal(err) @@ -129,8 +132,8 @@ func main() { duneClient, duneClientDLQ, ingester.Config{ - MaxConcurrentRequests: cfg.RPCConcurrency, - MaxConcurrentRequestsDLQ: cfg.DLQConcurrency, + MaxConcurrentRequests: cfg.BlockConcurrency, + MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency, ReportProgressInterval: cfg.ReportProgressInterval, PollInterval: cfg.PollInterval, PollDLQInterval: cfg.PollDLQInterval, diff --git a/config/config.go b/config/config.go index 94d2f9d..e571867 100644 --- a/config/config.go +++ b/config/config.go @@ -53,11 +53,12 @@ type Config struct { DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient - RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll - RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"25"` // nolint:lll - DLQConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent requests to the RPC node for DLQ processing" default:"2"` // nolint:lll - BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll - LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll + RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll + // kept the old cmdline arg names and env variables for backwards compatibility + BlockConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent block requests to the RPC node" default:"25"` // nolint:lll + DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll + BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll + LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll } func (c Config) HasError() error { diff --git a/go.mod b/go.mod index 26973a4..c1078f6 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.7 github.com/jessevdk/go-flags v1.5.0 github.com/klauspost/compress v1.17.8 + github.com/panjf2000/ants/v2 v2.10.0 github.com/stretchr/testify v1.9.0 golang.org/x/sync v0.7.0 ) diff --git a/go.sum b/go.sum index 1835ce7..246f676 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= @@ -20,10 +21,19 @@ github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxec github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1p+8= +github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -31,5 +41,6 @@ golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=