Skip to content

Commit

Permalink
DS client - add TLS support (#1553) (#1554)
Browse files Browse the repository at this point in the history
Add zkevm.l2-datastreamer-use-tls flag
If set to true, a TLS connection will be used.
This also enables SNI (Server Name Indication) support and allows connecting to ds server behind virtual hosting/load balancer

Co-authored-by: Ivan Zubok <93441191+iszubok@users.noreply.github.com>
  • Loading branch information
revitteth and iszubok authored Dec 12, 2024
1 parent 7f914d3 commit 7bba5f6
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 15 deletions.
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ var (
Usage: "L2 datastreamer endpoint",
Value: "",
}
L2DataStreamerUseTLSFlag = cli.BoolFlag{
Name: "zkevm.l2-datastreamer-use-tls",
Usage: "Use TLS connection to L2 datastreamer endpoint",
Value: false,
}
L2DataStreamerTimeout = cli.StringFlag{
Name: "zkevm.l2-datastreamer-timeout",
Usage: "The time to wait for data to arrive from the stream before reporting an error (0s doesn't check)",
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien

// creates a datastream client with default parameters
func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient {
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId)
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId)
}

func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig *chain.Config) error {
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Zk struct {
L2ChainId uint64
L2RpcUrl string
L2DataStreamerUrl string
L2DataStreamerUseTLS bool
L2DataStreamerTimeout time.Duration
L2ShortCircuitToVerifiedBatch bool
L1SyncStartBlock uint64
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ var DefaultFlags = []cli.Flag{
&utils.L2ChainIdFlag,
&utils.L2RpcUrlFlag,
&utils.L2DataStreamerUrlFlag,
&utils.L2DataStreamerUseTLSFlag,
&utils.L2DataStreamerTimeout,
&utils.L2ShortCircuitToVerifiedBatchFlag,
&utils.L1SyncStartBlock,
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
L2ChainId: ctx.Uint64(utils.L2ChainIdFlag.Name),
L2RpcUrl: ctx.String(utils.L2RpcUrlFlag.Name),
L2DataStreamerUrl: ctx.String(utils.L2DataStreamerUrlFlag.Name),
L2DataStreamerUseTLS: ctx.Bool(utils.L2DataStreamerUseTLSFlag.Name),
L2DataStreamerTimeout: l2DataStreamTimeout,
L2ShortCircuitToVerifiedBatch: l2ShortCircuitToVerifiedBatchVal,
L1SyncStartBlock: ctx.Uint64(utils.L1SyncStartBlock.Name),
Expand Down
25 changes: 21 additions & 4 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -10,10 +11,11 @@ import (
"sync/atomic"
"time"

"sync"

"github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ledgerwatch/erigon/zk/datastream/types"
"github.com/ledgerwatch/log/v3"
"sync"
)

type StreamType uint64
Expand Down Expand Up @@ -68,6 +70,9 @@ type StreamClient struct {

lastError error
started bool

useTLS bool
tlsConfig *tls.Config
}

const (
Expand All @@ -84,7 +89,7 @@ const (

// Creates a new client fo datastream
// server must be in format "url:port"
func NewClient(ctx context.Context, server string, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient {
func NewClient(ctx context.Context, server string, useTLS bool, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient {
c := &StreamClient{
ctx: ctx,
checkTimeout: checkTimeout,
Expand All @@ -94,8 +99,17 @@ func NewClient(ctx context.Context, server string, version int, checkTimeout tim
entryChan: make(chan interface{}, 100000),
currentFork: uint64(latestDownloadedForkId),
mtxStreaming: &sync.Mutex{},
useTLS: useTLS,
tlsConfig: &tls.Config{},
}

// Extract hostname from server address (removing port if present)
host, _, err := net.SplitHostPort(c.server)
if err != nil {
host = c.server // If no port was specified, use the full server string
}
c.tlsConfig.ServerName = host

return c
}

Expand Down Expand Up @@ -282,9 +296,12 @@ func (c *StreamClient) GetProgressAtomic() *atomic.Uint64 {

// Opens a TCP connection to the server
func (c *StreamClient) Start() error {
// Connect to server
var err error
c.conn, err = net.Dial("tcp", c.server)
if c.useTLS {
c.conn, err = tls.Dial("tcp", c.server, c.tlsConfig)
} else {
c.conn, err = net.Dial("tcp", c.server)
}
if err != nil {
return fmt.Errorf("connecting to server %s: %w", c.server, err)
}
Expand Down
12 changes: 6 additions & 6 deletions zk/datastream/client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestStreamClientReadHeaderEntry(t *testing.T) {
}

for _, testCase := range testCases {
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
server, conn := net.Pipe()
defer server.Close()
defer c.Stop()
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestStreamClientReadResultEntry(t *testing.T) {
}

for _, testCase := range testCases {
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
server, conn := net.Pipe()
defer server.Close()
defer c.Stop()
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestStreamClientReadFileEntry(t *testing.T) {
},
}
for _, testCase := range testCases {
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
server, conn := net.Pipe()
defer c.Stop()
defer server.Close()
Expand All @@ -215,7 +215,7 @@ func TestStreamClientReadFileEntry(t *testing.T) {
}

func TestStreamClientReadParsedProto(t *testing.T) {
c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
serverConn, clientConn := net.Pipe()
c.conn = clientConn
c.checkTimeout = 1 * time.Second
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestStreamClientGetLatestL2Block(t *testing.T) {
clientConn.Close()
}()

c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
c.conn = clientConn
c.checkTimeout = 1 * time.Second
c.allowStops = false
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestStreamClientGetL2BlockByNumber(t *testing.T) {
clientConn.Close()
}()

c := NewClient(context.Background(), "", 0, 500*time.Millisecond, 0)
c := NewClient(context.Background(), "", false, 0, 500*time.Millisecond, 0)
c.header = &types.HeaderEntry{
TotalEntries: 4,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func main() {
flag.StringVar(&stream2, "stream2", "", "the second stream to pull data from")
flag.Parse()

client1 := client.NewClient(ctx, stream1, 0, 0, 0)
client2 := client.NewClient(ctx, stream2, 0, 0, 0)
client1 := client.NewClient(ctx, stream1, false, 0, 0, 0)
client2 := client.NewClient(ctx, stream2, false, 0, 0, 0)

err := client1.Start()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion zk/debug_tools/datastream-correctness-check/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func main() {
}

// Create client
client := client.NewClient(ctx, cfg.Datastream, 3, 500, 0)
client := client.NewClient(ctx, cfg.Datastream, false, 3, 500, 0)

// Start client (connect to the server)
defer client.Stop()
Expand Down
2 changes: 1 addition & 1 deletion zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,5 +829,5 @@ func getHighestDSL2Block(ctx context.Context, batchCfg BatchesCfg, latestFork ui

func buildNewStreamClient(ctx context.Context, batchesCfg BatchesCfg, latestFork uint16) *client.StreamClient {
cfg := batchesCfg.zkCfg
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestFork)
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.L2DataStreamerUseTLS, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestFork)
}

0 comments on commit 7bba5f6

Please sign in to comment.