Skip to content

Commit

Permalink
fix: download entries till reaching the amount in header
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov committed Oct 10, 2024
1 parent e52fb2a commit eb6be5f
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 95 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ var (
L2DataStreamerTimeout = cli.StringFlag{
Name: "zkevm.l2-datastreamer-timeout",
Usage: "The time to wait for data to arrive from the stream before reporting an error (0s doesn't check)",
Value: "0s",
Value: "1s",
}
L1SyncStartBlock = cli.Uint64Flag{
Name: "zkevm.l1-sync-start-block",
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ replace github.com/ledgerwatch/erigon-lib => ./erigon-lib

require (
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.5
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6
github.com/99designs/gqlgen v0.17.40
github.com/Giulio2002/bls v0.0.0-20240315151443-652e18a3d188
github.com/Masterminds/sprig/v3 v3.2.3
Expand Down Expand Up @@ -174,6 +174,7 @@ require (
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c // indirect
github.com/go-delve/delve v1.21.2 // indirect
github.com/go-llsqlite/adapter v0.0.0-20230927005056-7f5ce7f0c916 // indirect
github.com/go-llsqlite/crawshaw v0.4.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ github.com/go-chi/chi/v5 v5.0.12 h1:9euLV5sTrTNTRUU9POmDUvfxyj6LAABLUcEWO+JJb4s=
github.com/go-chi/chi/v5 v5.0.12/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-delve/delve v1.21.2 h1:eaS+ziJo+660mi3D2q/VP8RxW5GcF4Y1zyKSi82alsU=
github.com/go-delve/delve v1.21.2/go.mod h1:FgTAiRUe43RS5EexL06RPyMtP8AMZVL/t9Qqgy3qUe4=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down
78 changes: 57 additions & 21 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package client

import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"reflect"
"sync/atomic"
Expand Down Expand Up @@ -44,6 +46,8 @@ type StreamClient struct {
id string // Client id
checkTimeout time.Duration // time to wait for data before reporting an error

header *types.HeaderEntry

// atomic
lastWrittenTime atomic.Int64
streaming atomic.Bool
Expand Down Expand Up @@ -94,11 +98,15 @@ func (c *StreamClient) GetEntryChan() *chan interface{} {
return &c.entryChan
}

func (c *StreamClient) GetEntryNumberLimit() uint64 {
return c.header.TotalEntries
}

// GetL2BlockByNumber queries the data stream by sending the L2 block start bookmark for the certain block number
// and streams the changes for that block (including the transactions).
// Note that this function is intended for on demand querying and it disposes the connection after it ends.
func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block, int, error) {
if _, err := c.EnsureConnected(); err != nil {
if err := c.EnsureConnected(); err != nil {
return nil, -1, err
}

Expand Down Expand Up @@ -134,7 +142,7 @@ func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block,
default:
}

parsedEntry, err := ReadParsedProto(c)
parsedEntry, _, err := ReadParsedProto(c)
if err != nil {
return nil, -1, err
}
Expand All @@ -156,13 +164,13 @@ func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block,
// it retrieves the latest File entry that is of EntryTypeL2Block type.
// Note that this function is intended for on demand querying and it disposes the connection after it ends.
func (c *StreamClient) GetLatestL2Block() (l2Block *types.FullL2Block, err error) {
if _, err := c.EnsureConnected(); err != nil {
return nil, err
if err := c.EnsureConnected(); err != nil {
return nil, fmt.Errorf("failed to ensure connect: %w", err)
}

h, err := c.GetHeader()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get header: %w", err)
}

latestEntryNum := h.TotalEntries - 1
Expand All @@ -187,7 +195,7 @@ func (c *StreamClient) GetLatestL2Block() (l2Block *types.FullL2Block, err error
}

if latestEntryNum == 0 {
return nil, errors.New("failed to retrieve the latest block from the data stream")
return nil, errors.New("no block found")
}

return l2Block, nil
Expand Down Expand Up @@ -262,6 +270,8 @@ func (c *StreamClient) GetHeader() (*types.HeaderEntry, error) {
return nil, fmt.Errorf("%s read header entry error: %v", c.id, err)
}

c.header = h

return h, nil
}

Expand Down Expand Up @@ -341,16 +351,32 @@ func (c *StreamClient) RenewEntryChannel() {
c.entryChan = make(chan interface{}, entryChannelSize)
}

func (c *StreamClient) EnsureConnected() (bool, error) {
func (c *StreamClient) connClosed() bool {
if c.conn == nil {
return true
}

c.conn.SetReadDeadline(time.Now())
one := new(bytes.Buffer)
if _, err := io.CopyN(one, c.conn, 1); err == io.ErrClosedPipe {
c.conn = nil
return true
}

c.conn.SetReadDeadline(time.Now().Add(1 * c.checkTimeout))
return false
}

func (c *StreamClient) EnsureConnected() error {
if c.connClosed() {
if err := c.tryReConnect(); err != nil {
return false, fmt.Errorf("failed to reconnect the datastream client: %w", err)
return fmt.Errorf("failed to reconnect the datastream client: %w", err)
}

c.RenewEntryChannel()
}

return true, nil
return nil
}

// reads entries to the end of the stream
Expand All @@ -365,7 +391,7 @@ func (c *StreamClient) ReadAllEntriesToChannel() error {
if progress == 0 {
bookmark = types.NewBookmarkProto(0, datastream.BookmarkType_BOOKMARK_TYPE_BATCH)
} else {
bookmark = types.NewBookmarkProto(progress, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK)
bookmark = types.NewBookmarkProto(progress+1, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK)
}

protoBookmark, err := bookmark.Marshal()
Expand Down Expand Up @@ -424,10 +450,9 @@ func (c *StreamClient) afterStartCommand() (*types.ResultEntry, error) {

// reads all entries from the server and sends them to a channel
// sends the parsed FullL2Blocks with transactions to a channel
func (c *StreamClient) readAllFullL2BlocksToChannel() error {
var err error

func (c *StreamClient) readAllFullL2BlocksToChannel() (err error) {
readNewProto := true
entryNum := uint64(0)
parsedProto := interface{}(nil)
LOOP:
for {
Expand All @@ -447,8 +472,8 @@ LOOP:
}

if readNewProto {
if parsedProto, err = ReadParsedProto(c); err != nil {
break
if parsedProto, entryNum, err = ReadParsedProto(c); err != nil {
return err
}
readNewProto = false
}
Expand All @@ -466,18 +491,22 @@ LOOP:
parsedProto.ForkId = c.currentFork
log.Trace("writing block to channel", "blockNumber", parsedProto.L2BlockNumber, "batchNumber", parsedProto.BatchNumber)
default:
err = fmt.Errorf("unexpected entry type: %v", parsedProto)
break LOOP
return fmt.Errorf("unexpected entry type: %v", parsedProto)
}
select {
case c.entryChan <- parsedProto:
readNewProto = true
default:
time.Sleep(1 * time.Millisecond)
time.Sleep(10 * time.Microsecond)
}

if c.header.TotalEntries == entryNum+1 {
log.Trace("reached the end of the stream", "header_totalEntries", c.header.TotalEntries, "entryNum", entryNum)
break LOOP
}
}

return err
return nil
}

func (c *StreamClient) tryReConnect() error {
Expand Down Expand Up @@ -507,10 +536,12 @@ func (c *StreamClient) StopReadingToChannel() {

type FileEntryIterator interface {
NextFileEntry() (*types.FileEntry, error)
GetEntryNumberLimit() uint64
}

func ReadParsedProto(iterator FileEntryIterator) (
parsedEntry interface{},
entryNum uint64,
err error,
) {
file, err := iterator.NextFileEntry()
Expand All @@ -520,8 +551,9 @@ func ReadParsedProto(iterator FileEntryIterator) (
}

if file == nil {
return nil, nil
return
}
entryNum = file.EntryNum

switch file.EntryType {
case types.BookmarkEntryType:
Expand All @@ -547,7 +579,7 @@ func ReadParsedProto(iterator FileEntryIterator) (
if innerFile, err = iterator.NextFileEntry(); err != nil {
return
}

entryNum = innerFile.EntryNum
if innerFile.IsL2Tx() {
if l2Tx, err = types.UnmarshalTx(innerFile.Data); err != nil {
return
Expand Down Expand Up @@ -583,6 +615,9 @@ func ReadParsedProto(iterator FileEntryIterator) (
err = fmt.Errorf("unexpected entry type inside a block: %d", innerFile.EntryType)
return
}
if entryNum == iterator.GetEntryNumberLimit() {
break LOOP
}
}

l2Block.L2Txs = txs
Expand All @@ -596,6 +631,7 @@ func ReadParsedProto(iterator FileEntryIterator) (
default:
err = fmt.Errorf("unexpected entry type: %d", file.EntryType)
}

return
}

Expand Down
3 changes: 2 additions & 1 deletion zk/datastream/client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,15 @@ func TestStreamClientReadParsedProto(t *testing.T) {
close(errCh)
}()

parsedEntry, err := ReadParsedProto(c)
parsedEntry, entryNum, err := ReadParsedProto(c)
require.NoError(t, err)
serverErr := <-errCh
require.NoError(t, serverErr)
expectedL2Tx := types.ConvertToL2TransactionProto(l2Tx)
expectedL2Block := types.ConvertToFullL2Block(l2Block)
expectedL2Block.L2Txs = append(expectedL2Block.L2Txs, *expectedL2Tx)
require.Equal(t, expectedL2Block, parsedEntry)
require.Equal(t, uint64(3), entryNum)
}

func TestStreamClientGetLatestL2Block(t *testing.T) {
Expand Down
9 changes: 3 additions & 6 deletions zk/datastream/client/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ func writeFullUint64ToConn(conn net.Conn, value uint64) error {
return errors.New("error nil connection")
}

_, err := conn.Write(buffer)
if err != nil {
if _, err := conn.Write(buffer); err != nil {
return fmt.Errorf("%s Error sending to server: %v", conn.RemoteAddr().String(), err)
}

Expand All @@ -31,8 +30,7 @@ func writeBytesToConn(conn net.Conn, value []byte) error {
return errors.New("error nil connection")
}

_, err := conn.Write(value)
if err != nil {
if _, err := conn.Write(value); err != nil {
return fmt.Errorf("%s Error sending to server: %v", conn.RemoteAddr().String(), err)
}

Expand All @@ -48,8 +46,7 @@ func writeFullUint32ToConn(conn net.Conn, value uint32) error {
return errors.New("error nil connection")
}

_, err := conn.Write(buffer)
if err != nil {
if _, err := conn.Write(buffer); err != nil {
return fmt.Errorf("%s Error sending to server: %v", conn.RemoteAddr().String(), err)
}

Expand Down
6 changes: 5 additions & 1 deletion zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ func newDataStreamServerIterator(stream *datastreamer.StreamServer, start uint64
}
}

func (it *dataStreamServerIterator) GetEntryNumberLimit() uint64 {
return it.header + 1
}

func (it *dataStreamServerIterator) NextFileEntry() (entry *types.FileEntry, err error) {
if it.curEntryNum > it.header {
return nil, nil
Expand Down Expand Up @@ -659,7 +663,7 @@ func ReadBatches(iterator client.FileEntryIterator, start uint64, end uint64) ([

LOOP_ENTRIES:
for {
parsedProto, err := client.ReadParsedProto(iterator)
parsedProto, _, err := client.ReadParsedProto(iterator)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit eb6be5f

Please sign in to comment.