diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7e377c0f36d..619ccdb1996 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -415,6 +415,11 @@ var ( Usage: "L2 datastreamer endpoint", Value: "", } + L2DataStreamerUseTLSFlag = cli.BoolFlag{ + Name: "zkevm.l2-datastreamer-use-tls", + Usage: "Use TLS connection to L2 datastreamer endpoint", + Value: false, + } L2DataStreamerTimeout = cli.StringFlag{ Name: "zkevm.l2-datastreamer-timeout", Usage: "The time to wait for data to arrive from the stream before reporting an error (0s doesn't check)", diff --git a/eth/backend.go b/eth/backend.go index 7e8f3db3ef9..b2417b6a436 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1302,7 +1302,7 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien // creates a datastream client with default parameters func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient { - return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) + return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) } func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig *chain.Config) error { diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index 10df0c027a4..f6796a0c37a 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -12,6 +12,7 @@ type Zk struct { L2ChainId uint64 L2RpcUrl string L2DataStreamerUrl string + L2DataStreamerUseTLS bool L2DataStreamerTimeout time.Duration L2ShortCircuitToVerifiedBatch bool L1SyncStartBlock uint64 diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 184ee8bcb00..88768cd3d47 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -176,6 +176,7 @@ var DefaultFlags = []cli.Flag{ &utils.L2ChainIdFlag, &utils.L2RpcUrlFlag, &utils.L2DataStreamerUrlFlag, + &utils.L2DataStreamerUseTLSFlag, &utils.L2DataStreamerTimeout, &utils.L2ShortCircuitToVerifiedBatchFlag, &utils.L1SyncStartBlock, diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index df4c081c21b..7bf8eb476ed 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -162,6 +162,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { L2ChainId: ctx.Uint64(utils.L2ChainIdFlag.Name), L2RpcUrl: ctx.String(utils.L2RpcUrlFlag.Name), L2DataStreamerUrl: ctx.String(utils.L2DataStreamerUrlFlag.Name), + L2DataStreamerUseTLS: ctx.Bool(utils.L2DataStreamerUseTLSFlag.Name), L2DataStreamerTimeout: l2DataStreamTimeout, L2ShortCircuitToVerifiedBatch: l2ShortCircuitToVerifiedBatchVal, L1SyncStartBlock: ctx.Uint64(utils.L1SyncStartBlock.Name), diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 23987f7ea31..d54e562c8e7 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -2,6 +2,7 @@ package client import ( "context" + "crypto/tls" "encoding/binary" "errors" "fmt" @@ -10,10 +11,11 @@ import ( "sync/atomic" "time" + "sync" + "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" "github.com/ledgerwatch/erigon/zk/datastream/types" "github.com/ledgerwatch/log/v3" - "sync" ) type StreamType uint64 @@ -68,6 +70,9 @@ type StreamClient struct { lastError error started bool + + useTLS bool + tlsConfig *tls.Config } const ( @@ -84,7 +89,7 @@ const ( // Creates a new client fo datastream // server must be in format "url:port" -func NewClient(ctx context.Context, server string, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient { +func NewClient(ctx context.Context, server string, useTLS bool, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient { c := &StreamClient{ ctx: ctx, checkTimeout: checkTimeout, @@ -94,8 +99,17 @@ func NewClient(ctx context.Context, server string, version int, checkTimeout tim entryChan: make(chan interface{}, 100000), currentFork: uint64(latestDownloadedForkId), mtxStreaming: &sync.Mutex{}, + useTLS: useTLS, + tlsConfig: &tls.Config{}, } + // Extract hostname from server address (removing port if present) + host, _, err := net.SplitHostPort(c.server) + if err != nil { + host = c.server // If no port was specified, use the full server string + } + c.tlsConfig.ServerName = host + return c } @@ -282,9 +296,12 @@ func (c *StreamClient) GetProgressAtomic() *atomic.Uint64 { // Opens a TCP connection to the server func (c *StreamClient) Start() error { - // Connect to server var err error - c.conn, err = net.Dial("tcp", c.server) + if c.useTLS { + c.conn, err = tls.Dial("tcp", c.server, c.tlsConfig) + } else { + c.conn, err = net.Dial("tcp", c.server) + } if err != nil { return fmt.Errorf("connecting to server %s: %w", c.server, err) } diff --git a/zk/datastream/client/stream_client_test.go b/zk/datastream/client/stream_client_test.go index db0f80e088a..9d3ec2338d5 100644 --- a/zk/datastream/client/stream_client_test.go +++ b/zk/datastream/client/stream_client_test.go @@ -50,7 +50,7 @@ func TestStreamClientReadHeaderEntry(t *testing.T) { } for _, testCase := range testCases { - c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) + c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0) server, conn := net.Pipe() defer server.Close() defer c.Stop() @@ -118,7 +118,7 @@ func TestStreamClientReadResultEntry(t *testing.T) { } for _, testCase := range testCases { - c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) + c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0) server, conn := net.Pipe() defer server.Close() defer c.Stop() @@ -191,7 +191,7 @@ func TestStreamClientReadFileEntry(t *testing.T) { }, } for _, testCase := range testCases { - c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) + c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0) server, conn := net.Pipe() defer c.Stop() defer server.Close() @@ -215,7 +215,7 @@ func TestStreamClientReadFileEntry(t *testing.T) { } func TestStreamClientReadParsedProto(t *testing.T) { - c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) + c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0) serverConn, clientConn := net.Pipe() c.conn = clientConn c.checkTimeout = 1 * time.Second @@ -287,7 +287,7 @@ func TestStreamClientGetLatestL2Block(t *testing.T) { clientConn.Close() }() - c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) + c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0) c.conn = clientConn c.checkTimeout = 1 * time.Second c.allowStops = false @@ -401,7 +401,7 @@ func TestStreamClientGetL2BlockByNumber(t *testing.T) { clientConn.Close() }() - c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0) + c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0) c.header = &types.HeaderEntry{ TotalEntries: 4, } diff --git a/zk/datastream/test/data_stream_compare/test_datastream_compare.go b/zk/datastream/test/data_stream_compare/test_datastream_compare.go index 9cc63c3d348..07ddb9379a3 100644 --- a/zk/datastream/test/data_stream_compare/test_datastream_compare.go +++ b/zk/datastream/test/data_stream_compare/test_datastream_compare.go @@ -25,8 +25,8 @@ func main() { flag.StringVar(&stream2, "stream2", "", "the second stream to pull data from") flag.Parse() - client1 := client.NewClient(ctx, stream1, 0, 0, 0) - client2 := client.NewClient(ctx, stream2, 0, 0, 0) + client1 := client.NewClient(ctx, stream1, false, 0, 0, 0) + client2 := client.NewClient(ctx, stream2, false, 0, 0, 0) err := client1.Start() if err != nil { diff --git a/zk/debug_tools/datastream-correctness-check/main.go b/zk/debug_tools/datastream-correctness-check/main.go index 9158994e8eb..ca533869eda 100644 --- a/zk/debug_tools/datastream-correctness-check/main.go +++ b/zk/debug_tools/datastream-correctness-check/main.go @@ -19,7 +19,7 @@ func main() { } // Create client - client := client.NewClient(ctx, cfg.Datastream, 3, 500, 0) + client := client.NewClient(ctx, cfg.Datastream, false, 3, 500, 0) // Start client (connect to the server) defer client.Stop() diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index 12b3f45aac7..2e0fece27de 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -848,5 +848,5 @@ func getHighestDSL2Block(ctx context.Context, batchCfg BatchesCfg, latestFork ui func buildNewStreamClient(ctx context.Context, batchesCfg BatchesCfg, latestFork uint16) *client.StreamClient { cfg := batchesCfg.zkCfg - return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestFork) + return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestFork) }