Skip to content

Commit

Permalink
fix: do not disconnect on stage batches end
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov committed Oct 11, 2024
1 parent d779f99 commit bcf8a1a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
4 changes: 2 additions & 2 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ func (c *StreamClient) Stop() {
if err := c.sendStopCmd(); err != nil {
log.Warn(fmt.Sprintf("Failed to send the stop command to the data stream server: %s", err))
}
c.conn.Close()
c.conn = nil
// c.conn.Close()
// c.conn = nil
}

// Command header: Get status
Expand Down
31 changes: 14 additions & 17 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/zk/datastream/client"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -170,12 +169,13 @@ func SpawnStageBatches(
return err
}

dsQueryClient, err := newStreamClient(ctx, cfg, latestForkId)
dsQueryClient, stopDsClient, err := newStreamClient(ctx, cfg, latestForkId)
if err != nil {
log.Warn(fmt.Sprintf("[%s] %s", logPrefix, err))
return err
}
defer dsQueryClient.Stop()
defer stopDsClient()

var highestDSL2Block *types.FullL2Block
for {
select {
Expand Down Expand Up @@ -716,25 +716,22 @@ func getUnwindPoint(eriDb erigon_db.ReadOnlyErigonDb, hermezDb state.ReadOnlyHer
}

// newStreamClient instantiates new datastreamer client and starts it.
func newStreamClient(ctx context.Context, cfg BatchesCfg, latestForkId uint64) (DatastreamClient, error) {
var (
dsClient DatastreamClient
err error
)

func newStreamClient(ctx context.Context, cfg BatchesCfg, latestForkId uint64) (dsClient DatastreamClient, stopFn func(), err error) {
if cfg.dsQueryClientCreator != nil {
dsClient, err = cfg.dsQueryClientCreator(ctx, cfg.zkCfg, latestForkId)
if err != nil {
return nil, fmt.Errorf("failed to create a datastream client. Reason: %w", err)
return nil, nil, fmt.Errorf("failed to create a datastream client. Reason: %w", err)
}
if err := dsClient.Start(); err != nil {
return nil, nil, fmt.Errorf("failed to start a datastream client. Reason: %w", err)
}
stopFn = func() {
dsClient.Stop()
}
} else {
zkCfg := cfg.zkCfg
dsClient = client.NewClient(ctx, zkCfg.L2DataStreamerUrl, zkCfg.DatastreamVersion, zkCfg.L2DataStreamerTimeout, uint16(latestForkId))
}

if err := dsClient.Start(); err != nil {
return nil, fmt.Errorf("failed to start a datastream client. Reason: %w", err)
dsClient = cfg.dsClient
stopFn = func() {}
}

return dsClient, nil
return dsClient, stopFn, nil
}

0 comments on commit bcf8a1a

Please sign in to comment.