Skip to content

Commit

Permalink
setup separate ws server and use its flags (#1215)
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov authored Sep 24, 2024
1 parent 9bdb248 commit 420f936
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 26 deletions.
74 changes: 68 additions & 6 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().Uint64Var(&cfg.MaxTraces, "trace.maxtraces", 200, "Sets a limit on traces that can be returned in trace_filter")
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketEnabled, "ws", false, "Enable Websockets - Same port as HTTP")
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketCompression, "ws.compression", false, "Enable Websocket compression (RFC 7692)")
rootCmd.PersistentFlags().StringVar(&cfg.WebSocketListenAddress, "ws.addr", nodecfg.DefaultHTTPHost, "Websocket server listening interface")
rootCmd.PersistentFlags().IntVar(&cfg.WebSocketPort, "ws.port", nodecfg.DefaultHTTPPort, "Websocket server listening port")
rootCmd.PersistentFlags().StringSliceVar(&cfg.WebsocketCORSDomain, "ws.corsdomain", []string{}, "Comma separated list of domains from which to accept cross origin requests (browser enforced)")
rootCmd.PersistentFlags().StringVar(&cfg.RpcAllowListFilePath, utils.RpcAccessListFlag.Name, "", "Specify granular (method-by-method) API allowlist")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, utils.RpcBatchConcurrencyFlag.Name, 2, utils.RpcBatchConcurrencyFlag.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.RpcStreamingDisable, utils.RpcStreamingDisableFlag.Name, false, utils.RpcStreamingDisableFlag.Usage)
Expand Down Expand Up @@ -533,14 +536,10 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp
}

httpHandler := node.NewHTTPHandlerStack(srv, cfg.HttpCORSDomain, cfg.HttpVirtualHost, cfg.HttpCompression)
var wsHandler http.Handler
if cfg.WebsocketEnabled {
wsHandler = srv.WebsocketHandler([]string{"*"}, nil, cfg.WebsocketCompression)
}

graphQLHandler := graphql.CreateHandler(defaultAPIList)

apiHandler, err := createHandler(cfg, defaultAPIList, httpHandler, wsHandler, graphQLHandler, nil)
apiHandler, err := createHandler(cfg, defaultAPIList, httpHandler, nil, graphQLHandler, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -609,6 +608,69 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp
log.Info("GRPC endpoint closed", "url", grpcEndpoint)
}
}()

if cfg.WebsocketEnabled {
wsSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable)

allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {
return err
}

var wsApiFlags []string
for _, flag := range cfg.WebSocketApi {
if flag != "engine" {
wsApiFlags = append(wsApiFlags, flag)
}
}

if err := node.RegisterApisFromWhitelist(defaultAPIList, wsApiFlags, wsSrv, false); err != nil {
return fmt.Errorf("could not start register WS apis: %w", err)
}
wsSrv.SetAllowList(allowListForRPC)

wsSrv.SetBatchLimit(cfg.BatchLimit)

var defaultAPIList []rpc.API

for _, api := range rpcAPI {
if api.Namespace != "engine" {
defaultAPIList = append(defaultAPIList, api)
}
}

var apiFlags []string
for _, flag := range cfg.API {
if flag != "engine" {
apiFlags = append(apiFlags, flag)
}
}

if err := node.RegisterApisFromWhitelist(defaultAPIList, apiFlags, wsSrv, false); err != nil {
return fmt.Errorf("could not start register RPC apis: %w", err)
}

wsEndpoint := fmt.Sprintf("%s:%d", cfg.WebSocketListenAddress, cfg.WebSocketPort)

wsHttpHandler := wsSrv.WebsocketHandler(cfg.WebsocketCORSDomain, nil, cfg.WebsocketCompression)
wsHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wsHttpHandler.ServeHTTP(w, r)
})

wsListener, wsHttpAddr, err := node.StartHTTPEndpoint(wsEndpoint, cfg.HTTPTimeouts, wsHandler)
if err != nil {
return fmt.Errorf("could not start ws RPC api: %w", err)
}

defer func() {
wsSrv.Stop()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = wsListener.Shutdown(shutdownCtx)
log.Info("WS endpoint closed", "url", wsHttpAddr)
}()
}

<-ctx.Done()
log.Info("Exiting...")
return nil
Expand Down Expand Up @@ -685,7 +747,7 @@ func obtainJWTSecret(cfg httpcfg.HttpCfg) ([]byte, error) {
return jwtSecret, nil
}

func createHandler(cfg httpcfg.HttpCfg, apiList []rpc.API, httpHandler http.Handler, wsHandler http.Handler, graphQLHandler http.Handler, jwtSecret []byte) (http.Handler, error) {
func createHandler(cfg httpcfg.HttpCfg, apiList []rpc.API, httpHandler, wsHandler, graphQLHandler http.Handler, jwtSecret []byte) (http.Handler, error) {
var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if cfg.GraphQLEnabled && graphql.ProcessGraphQLcheckIfNeeded(graphQLHandler, w, r) {
return
Expand Down
23 changes: 14 additions & 9 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,20 @@ type HttpCfg struct {
MaxTraces uint64
WebsocketEnabled bool
WebsocketCompression bool
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
StateCache kvcache.CoherentConfig
Snap ethconfig.Snapshot
Sync ethconfig.Sync
WebSocketListenAddress string
WebSocketPort int
WebsocketCORSDomain []string
WebSocketApi []string

RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
StateCache kvcache.CoherentConfig
Snap ethconfig.Snapshot
Sync ethconfig.Sync

// GRPC server
GRPCServerEnabled bool
Expand Down
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ var (
HTTPCORSDomainFlag = cli.StringFlag{
Name: "http.corsdomain",
Usage: "Comma separated list of domains from which to accept cross origin requests (browser enforced)",
Value: "",
Value: "*",
}
HTTPVirtualHostsFlag = cli.StringFlag{
Name: "http.vhosts",
Expand Down
3 changes: 3 additions & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ var DefaultFlags = []cli.Flag{
&utils.AuthRpcVirtualHostsFlag,
&utils.HTTPApiFlag,
&utils.WSEnabledFlag,
&utils.WSListenAddrFlag,
&utils.WSPortFlag,
&utils.WSApiFlag,
&utils.WsCompressionFlag,
&utils.HTTPTraceFlag,
&utils.StateCacheFlag,
Expand Down
29 changes: 19 additions & 10 deletions turbo/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config) {
apis := ctx.String(utils.HTTPApiFlag.Name)
log.Info("starting HTTP APIs", "APIs", apis)

wsEnabled := ctx.IsSet(utils.WSEnabledFlag.Name)
wsApis := strings.Split(ctx.String(utils.WSApiFlag.Name), ",")
if wsEnabled {
log.Info("starting WS APIs", "APIs", wsApis)
}
c := &httpcfg.HttpCfg{
Enabled: ctx.Bool(utils.HTTPEnabledFlag.Name),
Dirs: cfg.Dirs,
Expand Down Expand Up @@ -387,16 +392,20 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config) {
},
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),

WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
WebsocketEnabled: wsEnabled,
WebSocketListenAddress: ctx.String(utils.WSListenAddrFlag.Name),
WebSocketPort: ctx.Int(utils.WSPortFlag.Name),
WebsocketCORSDomain: strings.Split(ctx.String(utils.WSAllowedOriginsFlag.Name), ","),
WebSocketApi: wsApis,
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),

TxPoolApiAddr: ctx.String(utils.TxpoolApiAddrFlag.Name),

Expand Down

0 comments on commit 420f936

Please sign in to comment.