From bcf8a1a6d324e381be8831c442ec062002075239 Mon Sep 17 00:00:00 2001 From: Valentin Staykov Date: Fri, 11 Oct 2024 09:00:23 +0000 Subject: [PATCH] fix: do not disconnect on stage batches end --- zk/datastream/client/stream_client.go | 4 ++-- zk/stages/stage_batches.go | 31 ++++++++++++--------------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 22853f0174e..4d5b61e12a9 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -261,8 +261,8 @@ func (c *StreamClient) Stop() { if err := c.sendStopCmd(); err != nil { log.Warn(fmt.Sprintf("Failed to send the stop command to the data stream server: %s", err)) } - c.conn.Close() - c.conn = nil + // c.conn.Close() + // c.conn = nil } // Command header: Get status diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index 0041e038d60..295a891d94c 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -26,7 +26,6 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/state" "github.com/ledgerwatch/erigon/eth/ethconfig" - "github.com/ledgerwatch/erigon/zk/datastream/client" "github.com/ledgerwatch/log/v3" ) @@ -170,12 +169,13 @@ func SpawnStageBatches( return err } - dsQueryClient, err := newStreamClient(ctx, cfg, latestForkId) + dsQueryClient, stopDsClient, err := newStreamClient(ctx, cfg, latestForkId) if err != nil { log.Warn(fmt.Sprintf("[%s] %s", logPrefix, err)) return err } - defer dsQueryClient.Stop() + defer stopDsClient() + var highestDSL2Block *types.FullL2Block for { select { @@ -716,25 +716,22 @@ func getUnwindPoint(eriDb erigon_db.ReadOnlyErigonDb, hermezDb state.ReadOnlyHer } // newStreamClient instantiates new datastreamer client and starts it. -func newStreamClient(ctx context.Context, cfg BatchesCfg, latestForkId uint64) (DatastreamClient, error) { - var ( - dsClient DatastreamClient - err error - ) - +func newStreamClient(ctx context.Context, cfg BatchesCfg, latestForkId uint64) (dsClient DatastreamClient, stopFn func(), err error) { if cfg.dsQueryClientCreator != nil { dsClient, err = cfg.dsQueryClientCreator(ctx, cfg.zkCfg, latestForkId) if err != nil { - return nil, fmt.Errorf("failed to create a datastream client. Reason: %w", err) + return nil, nil, fmt.Errorf("failed to create a datastream client. Reason: %w", err) + } + if err := dsClient.Start(); err != nil { + return nil, nil, fmt.Errorf("failed to start a datastream client. Reason: %w", err) + } + stopFn = func() { + dsClient.Stop() } } else { - zkCfg := cfg.zkCfg - dsClient = client.NewClient(ctx, zkCfg.L2DataStreamerUrl, zkCfg.DatastreamVersion, zkCfg.L2DataStreamerTimeout, uint16(latestForkId)) - } - - if err := dsClient.Start(); err != nil { - return nil, fmt.Errorf("failed to start a datastream client. Reason: %w", err) + dsClient = cfg.dsClient + stopFn = func() {} } - return dsClient, nil + return dsClient, stopFn, nil }