Skip to content

Commit

Permalink
refactor: create datastream routine runner
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov committed Oct 2, 2024
1 parent 3f07d5f commit cccf726
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 72 deletions.
36 changes: 26 additions & 10 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type EntityDefinition struct {
const (
versionProto = 2 // converted to proto
versionAddedBlockEnd = 3 // Added block end
entryChannelSize = 100000
)

var (
Expand All @@ -44,9 +45,10 @@ type StreamClient struct {
checkTimeout time.Duration // time to wait for data before reporting an error

// atomic
lastWrittenTime atomic.Int64
streaming atomic.Bool
progress atomic.Uint64
lastWrittenTime atomic.Int64
streaming atomic.Bool
progress atomic.Uint64
stopReadingToChannel atomic.Bool

// Channels
entryChan chan interface{}
Expand Down Expand Up @@ -88,8 +90,8 @@ func (c *StreamClient) IsVersion3() bool {
return c.version >= versionAddedBlockEnd
}

func (c *StreamClient) GetEntryChan() chan interface{} {
return c.entryChan
func (c *StreamClient) GetEntryChan() *chan interface{} {
return &c.entryChan
}

// GetL2BlockByNumber queries the data stream by sending the L2 block start bookmark for the certain block number
Expand Down Expand Up @@ -227,7 +229,7 @@ func (c *StreamClient) Stop() {
c.conn.Close()
c.conn = nil

close(c.entryChan)
c.clearEntryCHannel()
}

// Command header: Get status
Expand Down Expand Up @@ -323,12 +325,25 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu
return nil
}

func (c *StreamClient) clearEntryCHannel() {
close(c.entryChan)
for range c.entryChan {
}
}

// close old entry chan and read all elements before opening a new one
func (c *StreamClient) renewEntryChannel() {
c.clearEntryCHannel()
c.entryChan = make(chan interface{}, entryChannelSize)
}

func (c *StreamClient) EnsureConnected() (bool, error) {
if c.conn == nil {
if err := c.tryReConnect(); err != nil {
return false, fmt.Errorf("failed to reconnect the datastream client: %w", err)
}
c.entryChan = make(chan interface{}, 100000)

c.renewEntryChannel()
}

return true, nil
Expand Down Expand Up @@ -368,9 +383,6 @@ func (c *StreamClient) ReadAllEntriesToChannel() error {
c.conn = nil
}

// reset the channels as there could be data ahead of the bookmark we want to track here.
// c.resetChannels()

return err2
}

Expand Down Expand Up @@ -474,6 +486,10 @@ func (c *StreamClient) tryReConnect() error {
return err
}

func (c *StreamClient) StopReadingToChannel() {
c.stopReadingToChannel.Store(true)
}

type FileEntryIterator interface {
NextFileEntry() (*types.FileEntry, error)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func readFromClient(client *client.StreamClient, total int) ([]interface{}, erro

LOOP:
for {
entry := <-client.GetEntryChan()
entry := <-*client.GetEntryChan()

switch entry.(type) {
case types.FullL2Block:
Expand Down
78 changes: 26 additions & 52 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,22 @@ type HermezDb interface {

type DatastreamClient interface {
ReadAllEntriesToChannel() error
GetEntryChan() chan interface{}
GetEntryChan() *chan interface{}
GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block, int, error)
GetLatestL2Block() (*types.FullL2Block, error)
GetLastWrittenTimeAtomic() *atomic.Int64
GetStreamingAtomic() *atomic.Bool
GetProgressAtomic() *atomic.Uint64
EnsureConnected() (bool, error)
Start() error
Stop()
}

type DatastreamReadRunner interface {
StartRead()
StopRead()
RestartReadFromBlock(fromBlock uint64)
}

type dsClientCreatorHandler func(context.Context, *ethconfig.Zk, uint64) (DatastreamClient, error)

type BatchesCfg struct {
Expand Down Expand Up @@ -180,9 +185,6 @@ func SpawnStageBatches(
dsClientProgress := cfg.dsClient.GetProgressAtomic()
dsClientProgress.Store(stageProgressBlockNo)

// start routine to download blocks and push them in a channel
connectDatastreamClient(&cfg, logPrefix, latestForkId, stageProgressBlockNo)

// start a routine to print blocks written progress
progressChan, stopProgressPrinter := zk.ProgressPrinterWithoutTotal(fmt.Sprintf("[%s] Downloaded blocks from datastream progress", logPrefix))
defer stopProgressPrinter()
Expand All @@ -205,13 +207,23 @@ func SpawnStageBatches(

log.Info(fmt.Sprintf("[%s] Reading blocks from the datastream.", logPrefix))

entryChan := cfg.dsClient.GetEntryChan()
unwindFn := func(unwindBlock uint64) error {
return rollback(logPrefix, eriDb, hermezDb, dsQueryClient, unwindBlock, tx, u)
}

batchProcessor, err := NewBatchesProcessor(ctx, logPrefix, tx, hermezDb, eriDb, cfg.zkCfg.SyncLimit, cfg.zkCfg.DebugLimit, cfg.zkCfg.DebugStepAfter, cfg.zkCfg.DebugStep, stageProgressBlockNo, stageProgressBatchNo, dsQueryClient, progressChan)
batchProcessor, err := NewBatchesProcessor(ctx, logPrefix, tx, hermezDb, eriDb, cfg.zkCfg.SyncLimit, cfg.zkCfg.DebugLimit, cfg.zkCfg.DebugStepAfter, cfg.zkCfg.DebugStep, stageProgressBlockNo, stageProgressBatchNo, dsQueryClient, progressChan, unwindFn)
if err != nil {
return err
}
prevAmountBlocksWritten, unwindBlock := uint64(0), uint64(0)

// start routine to download blocks and push them in a channel
dsClientRunner := NewDatastreamClientRunner(cfg.dsClient, logPrefix)
dsClientRunner.StartRead()
defer dsClientRunner.StartRead()

entryChan := cfg.dsClient.GetEntryChan()

prevAmountBlocksWritten, restartDatastreamBlock := uint64(0), uint64(0)
endLoop := false

for {
Expand All @@ -221,16 +233,14 @@ func SpawnStageBatches(
// if download routine finished, should continue to read from channel until it's empty
// if both download routine stopped and channel empty - stop loop
select {
case entry := <-entryChan:
if unwindBlock, endLoop, err = batchProcessor.ProcessEntry(entry); err != nil {
case entry := <-*entryChan:
if restartDatastreamBlock, endLoop, err = batchProcessor.ProcessEntry(entry); err != nil {
return err
}
if unwindBlock > 0 {
if err := rollback(logPrefix, eriDb, hermezDb, dsQueryClient, unwindBlock, tx, u); err != nil {
return err
}
cfg.dsClient.Stop()
return nil
dsClientProgress.Store(batchProcessor.LastBlockHeight())

if restartDatastreamBlock > 0 {
dsClientRunner.RestartReadFromBlock(restartDatastreamBlock)
}
case <-ctx.Done():
log.Warn(fmt.Sprintf("[%s] Context done", logPrefix))
Expand Down Expand Up @@ -310,42 +320,6 @@ func SpawnStageBatches(
return nil
}

func connectDatastreamClient(cfg *BatchesCfg, logPrefix string, latestForkId, stageProgressBlockNo uint64) {
// start routine to download blocks and push them in a channel
if !cfg.dsClient.GetStreamingAtomic().Load() {
log.Info(fmt.Sprintf("[%s] Starting stream", logPrefix), "startBlock", stageProgressBlockNo)
// this will download all blocks from datastream and push them in a channel
// if no error, break, else continue trying to get them
// Create bookmark

connected := false
var err error
for i := 0; i < 5; i++ {
connected, err = cfg.dsClient.EnsureConnected()
if err != nil {
log.Error(fmt.Sprintf("[%s] Error connecting to datastream", logPrefix), "error", err)
continue
}
if connected {
break
}
}

if !connected {
return
}

go func() {
log.Info(fmt.Sprintf("[%s] Started downloading L2Blocks routine", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished downloading L2Blocks routine", logPrefix))

if err := cfg.dsClient.ReadAllEntriesToChannel(); err != nil {
log.Error(fmt.Sprintf("[%s] Error downloading blocks from datastream", logPrefix), "error", err)
}
}()
}
}

func saveStageProgress(tx kv.RwTx, logPrefix string, highestHashableL2BlockNo, highestSeenBatchNo, lastBlockHeight, lastForkId uint64) error {
var err error
// store the highest hashable block number
Expand Down
108 changes: 108 additions & 0 deletions zk/stages/stage_batches_datastream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package stages

import (
"fmt"
"math/rand"
"sync/atomic"
"time"

"github.com/ledgerwatch/log/v3"
)

type DatastreamClientRunner struct {
dsClient DatastreamClient
logPrefix string
stopRunner atomic.Bool
isReading atomic.Bool
}

func NewDatastreamClientRunner(dsClient DatastreamClient, logPrefix string) *DatastreamClientRunner {
return &DatastreamClientRunner{
dsClient: dsClient,
logPrefix: logPrefix,
}
}

func (r *DatastreamClientRunner) StartRead() error {
if r.isReading.Load() {
return fmt.Errorf("tried starting datastream client runner thread while another is running")
}

go func() {
routineId := rand.Intn(1000000)

log.Info(fmt.Sprintf("[%s] Started downloading L2Blocks routine ID: %d", r.logPrefix, routineId))
defer log.Info(fmt.Sprintf("[%s] Ended downloading L2Blocks routine ID: %d", r.logPrefix, routineId))

r.isReading.Store(true)
defer r.isReading.Store(false)

for {
if r.stopRunner.Load() {
log.Info(fmt.Sprintf("[%s] Downloading L2Blocks routine stopped intentionally", r.logPrefix))
break
}

// start routine to download blocks and push them in a channel
if !r.dsClient.GetStreamingAtomic().Load() {
log.Info(fmt.Sprintf("[%s] Starting stream", r.logPrefix))
// this will download all blocks from datastream and push them in a channel
// if no error, break, else continue trying to get them
// Create bookmark

if err := r.connectDatastream(); err != nil {
log.Error(fmt.Sprintf("[%s] Error connecting to datastream", r.logPrefix), "error", err)
}

if err := r.dsClient.ReadAllEntriesToChannel(); err != nil {
log.Error(fmt.Sprintf("[%s] Error downloading blocks from datastream", r.logPrefix), "error", err)
}
}
}
}()

return nil
}

func (r *DatastreamClientRunner) StopRead() {
r.stopRunner.Store(true)
}

func (r *DatastreamClientRunner) RestartReadFromBlock(fromBlock uint64) error {
r.StopRead()

//wait for te old routine to be finished before continuing
counter := 0
for {
if r.isReading.Load() == false {
break
}
counter++
if counter > 100 {
return fmt.Errorf("failed to stop reader routine correctly")
}
time.Sleep(100 * time.Millisecond)
}

// set new block
r.dsClient.GetProgressAtomic().Store(fromBlock)

log.Info(fmt.Sprintf("[%s] Restarting datastream from block %d", r.logPrefix, fromBlock))

return r.StartRead()
}

func (r *DatastreamClientRunner) connectDatastream() (err error) {
var connected bool
for i := 0; i < 5; i++ {
if connected, err = r.dsClient.EnsureConnected(); err != nil {
log.Error(fmt.Sprintf("[%s] Error connecting to datastream", r.logPrefix), "error", err)
continue
}
if connected {
return nil
}
}

return fmt.Errorf("failed to connect to datastream")
}
Loading

0 comments on commit cccf726

Please sign in to comment.