diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index 759bd1b..5069d8c 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -27,10 +27,11 @@ const ( ) type rpcClient struct { - client *retryablehttp.Client - cfg Config - log *slog.Logger - bufPool *sync.Pool + client *retryablehttp.Client + cfg Config + log *slog.Logger + bufPool *sync.Pool + httpHeaders map[string]string } func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:disable-line:unexported-return @@ -61,6 +62,7 @@ func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis return new(bytes.Buffer) }, }, + httpHeaders: cfg.HTTPHeaders, } // lets validate RPC node is up & reachable _, err := rpc.LatestBlockNumber() @@ -112,6 +114,11 @@ func (c *rpcClient) getResponseBody( if err != nil { return err } + if c.httpHeaders != nil { + for k, v := range c.httpHeaders { + req.Header.Set(k, v) + } + } resp, err := c.client.Do(req) if err != nil { return fmt.Errorf("failed to send request for method %s: %w", method, err) diff --git a/client/jsonrpc/models.go b/client/jsonrpc/models.go index 357f693..55d69c5 100644 --- a/client/jsonrpc/models.go +++ b/client/jsonrpc/models.go @@ -5,4 +5,5 @@ import "time" type Config struct { URL string PollInterval time.Duration + HTTPHeaders map[string]string } diff --git a/cmd/main.go b/cmd/main.go index c1face2..b3d9a1e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "os/signal" + "strings" stdsync "sync" "syscall" "time" @@ -48,10 +49,20 @@ func main() { var wg stdsync.WaitGroup var rpcClient jsonrpc.BlockchainClient + rpcHTTPHeaders := make(map[string]string) + if cfg.RPCNode.ExtraHTTPHeader != "" { + pair := strings.Split(cfg.RPCNode.ExtraHTTPHeader, ",") + // We've validated this list has two elements + key := strings.Trim(pair[0], " ") + value := strings.Trim(pair[1], " ") + logger.Info("Adding extra HTTP header to RPC requests", "key", key, "value", value) + rpcHTTPHeaders[key] = value + } switch cfg.RPCStack { case models.OpStack: rpcClient, err = jsonrpc.NewOpStackClient(logger, jsonrpc.Config{ - URL: cfg.RPCNode.NodeURL, + URL: cfg.RPCNode.NodeURL, + HTTPHeaders: rpcHTTPHeaders, }) default: stdlog.Fatalf("unsupported RPC stack: %s", cfg.RPCStack) diff --git a/config/config.go b/config/config.go index 7e2dc43..6be9eac 100644 --- a/config/config.go +++ b/config/config.go @@ -2,6 +2,8 @@ package config import ( "errors" + "fmt" + "strings" "time" "github.com/duneanalytics/blockchain-ingester/models" @@ -21,13 +23,20 @@ func (d DuneClient) HasError() error { } type RPCClient struct { - NodeURL string `long:"rpc-node-url" env:"RPC_NODE_URL" description:"URL for the blockchain node"` + NodeURL string `long:"rpc-node-url" env:"RPC_NODE_URL" description:"URL for the blockchain node"` + ExtraHTTPHeader string `long:"rpc-http-header" env:"RPC_HTTP_HEADER" description:"Extra HTTP header to send with RPC requests. On the form 'key,value'"` // nolint:lll } func (r RPCClient) HasError() error { if r.NodeURL == "" { return errors.New("RPC node URL is required") } + if r.ExtraHTTPHeader != "" { + header := strings.Split(r.ExtraHTTPHeader, ",") + if len(header) != 2 { + return fmt.Errorf("invalid rpc http header: expected 'key,value', got '%s'", r.ExtraHTTPHeader) + } + } return nil }