Skip to content

Commit

Permalink
DS client - add TLS support (#1553)
Browse files Browse the repository at this point in the history
Add zkevm.l2-datastreamer-use-tls flag
If set to true, a TLS connection will be used.
This also enables SNI (Server Name Indication) support and allows connecting to ds server behind virtual hosting/load balancer
  • Loading branch information
iszubok authored and revitteth committed Dec 12, 2024
1 parent 0792a9c commit 1ef03d8
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 7 deletions.
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ var (
Usage: "L2 datastreamer endpoint",
Value: "",
}
L2DataStreamerUseTLSFlag = cli.BoolFlag{
Name: "zkevm.l2-datastreamer-use-tls",
Usage: "Use TLS connection to L2 datastreamer endpoint",
Value: false,
}
L2DataStreamerTimeout = cli.StringFlag{
Name: "zkevm.l2-datastreamer-timeout",
Usage: "The time to wait for data to arrive from the stream before reporting an error (0s doesn't check)",
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien

// creates a datastream client with default parameters
func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient {
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId)
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId)
}

func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error {
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Zk struct {
L2ChainId uint64
L2RpcUrl string
L2DataStreamerUrl string
L2DataStreamerUseTLS bool
L2DataStreamerTimeout time.Duration
L1SyncStartBlock uint64
L1SyncStopBatch uint64
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ var DefaultFlags = []cli.Flag{
&utils.L2ChainIdFlag,
&utils.L2RpcUrlFlag,
&utils.L2DataStreamerUrlFlag,
&utils.L2DataStreamerUseTLSFlag,
&utils.L2DataStreamerTimeout,
&utils.L1SyncStartBlock,
&utils.L1SyncStopBatch,
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
L2ChainId: ctx.Uint64(utils.L2ChainIdFlag.Name),
L2RpcUrl: ctx.String(utils.L2RpcUrlFlag.Name),
L2DataStreamerUrl: ctx.String(utils.L2DataStreamerUrlFlag.Name),
L2DataStreamerUseTLS: ctx.Bool(utils.L2DataStreamerUseTLSFlag.Name),
L2DataStreamerTimeout: l2DataStreamTimeout,
L1SyncStartBlock: ctx.Uint64(utils.L1SyncStartBlock.Name),
L1SyncStopBatch: ctx.Uint64(utils.L1SyncStopBatch.Name),
Expand Down
20 changes: 17 additions & 3 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -49,6 +50,8 @@ type StreamClient struct {

// keeps track of the latest fork from the stream to assign to l2 blocks
currentFork uint64
tlsConfig *tls.Config
useTLS bool
}

const (
Expand All @@ -64,7 +67,7 @@ const (

// Creates a new client fo datastream
// server must be in format "url:port"
func NewClient(ctx context.Context, server string, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient {
func NewClient(ctx context.Context, server string, useTLS bool, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient {
c := &StreamClient{
ctx: ctx,
checkTimeout: checkTimeout,
Expand All @@ -74,8 +77,16 @@ func NewClient(ctx context.Context, server string, version int, checkTimeout tim
id: "",
entryChan: make(chan interface{}, 100000),
currentFork: uint64(latestDownloadedForkId),
tlsConfig: &tls.Config{},
}

// Extract hostname from server address (removing port if present)
host, _, err := net.SplitHostPort(c.server)
if err != nil {
host = c.server // If no port was specified, use the full server string
}
c.tlsConfig.ServerName = host

return c
}

Expand All @@ -98,9 +109,12 @@ func (c *StreamClient) GetProgressAtomic() *atomic.Uint64 {

// Opens a TCP connection to the server
func (c *StreamClient) Start() error {
// Connect to server
var err error
c.conn, err = net.Dial("tcp", c.server)
if c.useTLS {
c.conn, err = tls.Dial("tcp", c.server, c.tlsConfig)
} else {
c.conn, err = net.Dial("tcp", c.server)
}
if err != nil {
return fmt.Errorf("error connecting to server %s: %v", c.server, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func main() {
flag.StringVar(&stream2, "stream2", "", "the second stream to pull data from")
flag.Parse()

client1 := client.NewClient(ctx, stream1, 0, 0, 0)
client2 := client.NewClient(ctx, stream2, 0, 0, 0)
client1 := client.NewClient(ctx, stream1, false, 0, 0, 0)
client2 := client.NewClient(ctx, stream2, false, 0, 0, 0)

err := client1.Start()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion zk/debug_tools/datastream-correctness-check/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func main() {
}

// Create client
client := client.NewClient(ctx, cfg.Datastream, 3, 500, 0)
client := client.NewClient(ctx, cfg.Datastream, false, 3, 500, 0)

// Start client (connect to the server)
defer client.Stop()
Expand Down
6 changes: 6 additions & 0 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/zk"
"github.com/ledgerwatch/erigon/zk/datastream/client"
"github.com/ledgerwatch/erigon/zk/datastream/types"
"github.com/ledgerwatch/erigon/zk/erigon_db"
"github.com/ledgerwatch/erigon/zk/hermez_db"
Expand Down Expand Up @@ -862,3 +863,8 @@ func writeL2Block(eriDb ErigonDb, hermezDb HermezDb, l2Block *types.FullL2Block,

return nil
}

func buildNewStreamClient(ctx context.Context, batchesCfg BatchesCfg, latestFork uint16) *client.StreamClient {
cfg := batchesCfg.zkCfg
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestFork)
}

0 comments on commit 1ef03d8

Please sign in to comment.