Skip to content

Commit

Permalink
refactor: extract stage batches logic into batches processor (#1251)
Browse files Browse the repository at this point in the history
* refactor: extract stage batches logic into batches processor

* refactor: rename batches_processor to stage_batches_processor

* fix(stage_batches): set new db tx upon progress save

* refactor: create datastream routine runner

* fix: bug with entry channe closing and hash check

* fix: typo
  • Loading branch information
V-Staykov authored Oct 7, 2024
1 parent 2f6f9dd commit 7177a0b
Show file tree
Hide file tree
Showing 10 changed files with 706 additions and 453 deletions.
40 changes: 30 additions & 10 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type EntityDefinition struct {
const (
versionProto = 2 // converted to proto
versionAddedBlockEnd = 3 // Added block end
entryChannelSize = 100000
)

var (
Expand All @@ -44,9 +45,10 @@ type StreamClient struct {
checkTimeout time.Duration // time to wait for data before reporting an error

// atomic
lastWrittenTime atomic.Int64
streaming atomic.Bool
progress atomic.Uint64
lastWrittenTime atomic.Int64
streaming atomic.Bool
progress atomic.Uint64
stopReadingToChannel atomic.Bool

// Channels
entryChan chan interface{}
Expand Down Expand Up @@ -88,8 +90,8 @@ func (c *StreamClient) IsVersion3() bool {
return c.version >= versionAddedBlockEnd
}

func (c *StreamClient) GetEntryChan() chan interface{} {
return c.entryChan
func (c *StreamClient) GetEntryChan() *chan interface{} {
return &c.entryChan
}

// GetL2BlockByNumber queries the data stream by sending the L2 block start bookmark for the certain block number
Expand Down Expand Up @@ -227,7 +229,7 @@ func (c *StreamClient) Stop() {
c.conn.Close()
c.conn = nil

close(c.entryChan)
c.clearEntryCHannel()
}

// Command header: Get status
Expand Down Expand Up @@ -323,12 +325,29 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu
return nil
}

func (c *StreamClient) clearEntryCHannel() {
select {
case <-c.entryChan:
close(c.entryChan)
for range c.entryChan {
}
default:
}
}

// close old entry chan and read all elements before opening a new one
func (c *StreamClient) renewEntryChannel() {
c.clearEntryCHannel()
c.entryChan = make(chan interface{}, entryChannelSize)
}

func (c *StreamClient) EnsureConnected() (bool, error) {
if c.conn == nil {
if err := c.tryReConnect(); err != nil {
return false, fmt.Errorf("failed to reconnect the datastream client: %w", err)
}
c.entryChan = make(chan interface{}, 100000)

c.renewEntryChannel()
}

return true, nil
Expand Down Expand Up @@ -368,9 +387,6 @@ func (c *StreamClient) ReadAllEntriesToChannel() error {
c.conn = nil
}

// reset the channels as there could be data ahead of the bookmark we want to track here.
// c.resetChannels()

return err2
}

Expand Down Expand Up @@ -474,6 +490,10 @@ func (c *StreamClient) tryReConnect() error {
return err
}

func (c *StreamClient) StopReadingToChannel() {
c.stopReadingToChannel.Store(true)
}

type FileEntryIterator interface {
NextFileEntry() (*types.FileEntry, error)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func readFromClient(client *client.StreamClient, total int) ([]interface{}, erro

LOOP:
for {
entry := <-client.GetEntryChan()
entry := <-*client.GetEntryChan()

switch entry.(type) {
case types.FullL2Block:
Expand Down
4 changes: 4 additions & 0 deletions zk/erigon_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func NewErigonDb(tx kv.RwTx) *ErigonDb {
}
}

func (db *ErigonDb) SetNewTx(tx kv.RwTx) {
db.tx = tx
}

func (db ErigonDb) WriteHeader(
blockNo *big.Int,
blockHash common.Hash,
Expand Down
5 changes: 5 additions & 0 deletions zk/hermez_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func NewHermezDb(tx kv.RwTx) *HermezDb {
return db
}

func (db *HermezDb) SetNewTx(tx kv.RwTx) {
db.tx = tx
db.HermezDbReader.tx = tx
}

func CreateHermezBuckets(tx kv.RwTx) error {
for _, t := range HermezDbTables {
if err := tx.CreateBucket(t); err != nil {
Expand Down
Loading

0 comments on commit 7177a0b

Please sign in to comment.