diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 61baf4a72de..a0e5424d6a0 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -365,6 +365,11 @@ var ( Usage: "L2 datastreamer endpoint", Value: "", } + L2DataStreamerUseTLSFlag = cli.BoolFlag{ + Name: "zkevm.l2-datastreamer-use-tls", + Usage: "Use TLS connection to L2 datastreamer endpoint", + Value: false, + } L2DataStreamerTimeout = cli.StringFlag{ Name: "zkevm.l2-datastreamer-timeout", Usage: "The time to wait for data to arrive from the stream before reporting an error (0s doesn't check)", diff --git a/eth/backend.go b/eth/backend.go index 61d2f5f6b73..7cc3390aa1e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1055,7 +1055,7 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien // creates a datastream client with default parameters func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient { - return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) + return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) } func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error { diff --git a/eth/ethconfig/config_zkevm.go b/eth/ethconfig/config_zkevm.go index c032e905e83..525248cfc48 100644 --- a/eth/ethconfig/config_zkevm.go +++ b/eth/ethconfig/config_zkevm.go @@ -11,6 +11,7 @@ type Zk struct { L2ChainId uint64 L2RpcUrl string L2DataStreamerUrl string + L2DataStreamerUseTLS bool L2DataStreamerTimeout time.Duration L1SyncStartBlock uint64 L1SyncStopBatch uint64 diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index bda4e9882d3..6a9c7a97280 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -168,6 +168,7 @@ var DefaultFlags = []cli.Flag{ &utils.L2ChainIdFlag, &utils.L2RpcUrlFlag, &utils.L2DataStreamerUrlFlag, + &utils.L2DataStreamerUseTLSFlag, &utils.L2DataStreamerTimeout, &utils.L1SyncStartBlock, &utils.L1SyncStopBatch, diff --git a/turbo/cli/flags_zkevm.go b/turbo/cli/flags_zkevm.go index 9aa1c0637d5..371658d48aa 100644 --- a/turbo/cli/flags_zkevm.go +++ b/turbo/cli/flags_zkevm.go @@ -125,6 +125,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) { L2ChainId: ctx.Uint64(utils.L2ChainIdFlag.Name), L2RpcUrl: ctx.String(utils.L2RpcUrlFlag.Name), L2DataStreamerUrl: ctx.String(utils.L2DataStreamerUrlFlag.Name), + L2DataStreamerUseTLS: ctx.Bool(utils.L2DataStreamerUseTLSFlag.Name), L2DataStreamerTimeout: l2DataStreamTimeout, L1SyncStartBlock: ctx.Uint64(utils.L1SyncStartBlock.Name), L1SyncStopBatch: ctx.Uint64(utils.L1SyncStopBatch.Name), diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index f7362743284..66bfc22e2bb 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -2,6 +2,7 @@ package client import ( "context" + "crypto/tls" "encoding/binary" "errors" "fmt" @@ -49,6 +50,8 @@ type StreamClient struct { // keeps track of the latest fork from the stream to assign to l2 blocks currentFork uint64 + tlsConfig *tls.Config + useTLS bool } const ( @@ -64,7 +67,7 @@ const ( // Creates a new client fo datastream // server must be in format "url:port" -func NewClient(ctx context.Context, server string, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient { +func NewClient(ctx context.Context, server string, useTLS bool, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient { c := &StreamClient{ ctx: ctx, checkTimeout: checkTimeout, @@ -74,8 +77,16 @@ func NewClient(ctx context.Context, server string, version int, checkTimeout tim id: "", entryChan: make(chan interface{}, 100000), currentFork: uint64(latestDownloadedForkId), + tlsConfig: &tls.Config{}, } + // Extract hostname from server address (removing port if present) + host, _, err := net.SplitHostPort(c.server) + if err != nil { + host = c.server // If no port was specified, use the full server string + } + c.tlsConfig.ServerName = host + return c } @@ -98,9 +109,12 @@ func (c *StreamClient) GetProgressAtomic() *atomic.Uint64 { // Opens a TCP connection to the server func (c *StreamClient) Start() error { - // Connect to server var err error - c.conn, err = net.Dial("tcp", c.server) + if c.useTLS { + c.conn, err = tls.Dial("tcp", c.server, c.tlsConfig) + } else { + c.conn, err = net.Dial("tcp", c.server) + } if err != nil { return fmt.Errorf("error connecting to server %s: %v", c.server, err) } diff --git a/zk/datastream/test/data_stream_compare/test_datastream_compare.go b/zk/datastream/test/data_stream_compare/test_datastream_compare.go index d5093a482c9..e3e79d55fb7 100644 --- a/zk/datastream/test/data_stream_compare/test_datastream_compare.go +++ b/zk/datastream/test/data_stream_compare/test_datastream_compare.go @@ -25,8 +25,8 @@ func main() { flag.StringVar(&stream2, "stream2", "", "the second stream to pull data from") flag.Parse() - client1 := client.NewClient(ctx, stream1, 0, 0, 0) - client2 := client.NewClient(ctx, stream2, 0, 0, 0) + client1 := client.NewClient(ctx, stream1, false, 0, 0, 0) + client2 := client.NewClient(ctx, stream2, false, 0, 0, 0) err := client1.Start() if err != nil { diff --git a/zk/debug_tools/datastream-correctness-check/main.go b/zk/debug_tools/datastream-correctness-check/main.go index e3dd18700c6..248704b4063 100644 --- a/zk/debug_tools/datastream-correctness-check/main.go +++ b/zk/debug_tools/datastream-correctness-check/main.go @@ -19,7 +19,7 @@ func main() { } // Create client - client := client.NewClient(ctx, cfg.Datastream, 3, 500, 0) + client := client.NewClient(ctx, cfg.Datastream, false, 3, 500, 0) // Start client (connect to the server) defer client.Stop() diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index a6c360c5fb8..72b483316d6 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -16,6 +16,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk" + "github.com/ledgerwatch/erigon/zk/datastream/client" "github.com/ledgerwatch/erigon/zk/datastream/types" "github.com/ledgerwatch/erigon/zk/erigon_db" "github.com/ledgerwatch/erigon/zk/hermez_db" @@ -862,3 +863,8 @@ func writeL2Block(eriDb ErigonDb, hermezDb HermezDb, l2Block *types.FullL2Block, return nil } + +func buildNewStreamClient(ctx context.Context, batchesCfg BatchesCfg, latestFork uint16) *client.StreamClient { + cfg := batchesCfg.zkCfg + return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestFork) +}