Skip to content

Commit

Permalink
configurable wait time on the datastreamer client
Browse files Browse the repository at this point in the history
  • Loading branch information
hexoscott committed Mar 20, 2024
1 parent 98f5ab0 commit 05b0c9f
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 24 deletions.
3 changes: 3 additions & 0 deletions cmd/rpcdaemon/commands/zkevm_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ func (api *ZkEvmAPIImpl) GetFullBlockByNumber(ctx context.Context, number rpc.Bl
if err != nil {
return types.Block{}, err
}
if baseBlock == nil {
return types.Block{}, errors.New("could not find block")
}

return api.populateBlockDetail(tx, ctx, baseBlock, fullTx)
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ var (
Usage: "L2 datastreamer endpoint",
Value: "",
}
L2DataStreamerTimeout = cli.StringFlag{
Name: "zkevm.l2-datastreamer-timeout",
Usage: "The time to wait for data to arrive from the stream before reporting an error (0s doesn't check)",
Value: "0s",
}
L1ChainIdFlag = cli.Uint64Flag{
Name: "zkevm.l1-chain-id",
Usage: "Ethereum L1 chain ID",
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ func initDataStreamClient(cfg *ethconfig.Zk) *client.StreamClient {
// Create client
log.Info("Starting datastream client...")
// retry connection
datastreamClient := client.NewClient(cfg.L2DataStreamerUrl, cfg.DatastreamVersion)
datastreamClient := client.NewClient(cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout)

for i := 0; i < 30; i++ {
// Start client (connect to the server)
Expand Down
6 changes: 5 additions & 1 deletion eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package ethconfig

import "github.com/ledgerwatch/erigon-lib/common"
import (
"github.com/ledgerwatch/erigon-lib/common"
"time"
)

type Zk struct {
L2ChainId uint64
L2RpcUrl string
L2DataStreamerUrl string
L2DataStreamerTimeout time.Duration
L1ChainId uint64
L1RpcUrl string
L1PolygonRollupManager common.Address
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ var DefaultFlags = []cli.Flag{
&utils.L2ChainIdFlag,
&utils.L2RpcUrlFlag,
&utils.L2DataStreamerUrlFlag,
&utils.L2DataStreamerTimeout,
&utils.L1ChainIdFlag,
&utils.L1RpcUrlFlag,
&utils.L1PolygonRollupManagerFlag,
Expand Down
9 changes: 9 additions & 0 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/zk/sequencer"
"github.com/urfave/cli/v2"
"time"
)

func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
Expand All @@ -26,10 +27,17 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
}
}

l2DataStreamTimeoutVal := ctx.String(utils.L2DataStreamerTimeout.Name)
l2DataStreamTimeout, err := time.ParseDuration(l2DataStreamTimeoutVal)
if err != nil {
panic(fmt.Sprintf("could not parse l2 datastreamer timeout value %s", l2DataStreamTimeoutVal))
}

cfg.Zk = &ethconfig.Zk{
L2ChainId: ctx.Uint64(utils.L2ChainIdFlag.Name),
L2RpcUrl: ctx.String(utils.L2RpcUrlFlag.Name),
L2DataStreamerUrl: ctx.String(utils.L2DataStreamerUrlFlag.Name),
L2DataStreamerTimeout: l2DataStreamTimeout,
L1ChainId: ctx.Uint64(utils.L1ChainIdFlag.Name),
L1RpcUrl: ctx.String(utils.L1RpcUrlFlag.Name),
L1PolygonRollupManager: libcommon.HexToAddress(ctx.String(utils.L1PolygonRollupManagerFlag.Name)),
Expand All @@ -56,6 +64,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
if !sequencer.IsSequencer() {
checkFlag(utils.L2RpcUrlFlag.Name, cfg.Zk.L2RpcUrl)
checkFlag(utils.L2DataStreamerUrlFlag.Name, cfg.Zk.L2DataStreamerUrl)
checkFlag(utils.L2DataStreamerTimeout.Name, cfg.Zk.L2DataStreamerTimeout)
} else {
checkFlag(utils.SequencerInitialForkId.Name, cfg.Zk.SequencerInitialForkId)
checkFlag(utils.SequencerAddressFlag.Name, cfg.Zk.SequencerAddress)
Expand Down
28 changes: 16 additions & 12 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ type EntityDefinition struct {
}

type StreamClient struct {
server string // Server address to connect IP:port
version int
streamType StreamType
conn net.Conn
id string // Client id
Header types.HeaderEntry // Header info received (from Header command)
server string // Server address to connect IP:port
version int
streamType StreamType
conn net.Conn
id string // Client id
Header types.HeaderEntry // Header info received (from Header command)
checkTimeout time.Duration // time to wait for data before reporting an error

entriesDefinition map[types.EntryType]EntityDefinition

Expand Down Expand Up @@ -58,13 +59,14 @@ const (

// Creates a new client fo datastream
// server must be in format "url:port"
func NewClient(server string, version int) *StreamClient {
func NewClient(server string, version int, checkTimeout time.Duration) *StreamClient {
// Create the client data stream
c := &StreamClient{
server: server,
version: version,
streamType: StSequencer,
id: "",
checkTimeout: checkTimeout,
server: server,
version: version,
streamType: StSequencer,
id: "",
entriesDefinition: map[types.EntryType]EntityDefinition{
types.EntryTypeStartL2Block: {
Name: "StartL2Block",
Expand Down Expand Up @@ -283,7 +285,9 @@ func (c *StreamClient) afterStartCommand() error {
func (c *StreamClient) readAllFullL2BlocksToChannel() error {
var err error
for {
c.conn.SetReadDeadline(time.Now().Add(5 * time.Second))
if c.checkTimeout > 0 {
c.conn.SetReadDeadline(time.Now().Add(c.checkTimeout))
}
fullBlock, gerUpdates, _, _, _, localErr := c.readFullBlock()
if localErr != nil {
err = localErr
Expand Down
10 changes: 5 additions & 5 deletions zk/datastream/client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Test_readHeaderEntry(t *testing.T) {
}

for _, testCase := range testCases {
c := NewClient("", 0)
c := NewClient("", 0, 0)
server, conn := net.Pipe()
defer server.Close()
defer c.Stop()
Expand Down Expand Up @@ -106,7 +106,7 @@ func Test_readResultEntry(t *testing.T) {
}

for _, testCase := range testCases {
c := NewClient("", 0)
c := NewClient("", 0, 0)
server, conn := net.Pipe()
defer server.Close()
defer c.Stop()
Expand Down Expand Up @@ -175,7 +175,7 @@ func Test_readFileEntry(t *testing.T) {
},
}
for _, testCase := range testCases {
c := NewClient("", 0)
c := NewClient("", 0, 0)
server, conn := net.Pipe()
defer server.Close()
defer c.Stop()
Expand Down Expand Up @@ -369,7 +369,7 @@ func Test_readFullL2Blocks(t *testing.T) {
},
}
for _, testCase := range testCases {
c := NewClient("", BigEndianVersion)
c := NewClient("", BigEndianVersion, 0)
c.Header.TotalEntries = 3
server, conn := net.Pipe()
defer server.Close()
Expand Down Expand Up @@ -525,7 +525,7 @@ func Test_readFullBlock(t *testing.T) {
},
}
for _, testCase := range testCases {
c := NewClient("", BigEndianVersion)
c := NewClient("", BigEndianVersion, 0)
c.Header.TotalEntries = 3
server, conn := net.Pipe()
defer server.Close()
Expand Down
2 changes: 1 addition & 1 deletion zk/datastream/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// Download a set amount of blocks from datastream server to channel
func DownloadL2Blocks(datastreamUrl string, fromBlock uint64, l2BlocksAmount int) (*[]types.FullL2Block, *[]types.GerUpdate, map[uint64][]byte, uint64, error) {
// Create client
c := client.NewClient(datastreamUrl, 0)
c := client.NewClient(datastreamUrl, 0, 0)

// Start client (connect to the server)
defer c.Stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func main() {
flag.StringVar(&stream2, "stream2", "", "the second stream to pull data from")
flag.Parse()

client1 := client.NewClient(stream1, 0)
client2 := client.NewClient(stream2, 0)
client1 := client.NewClient(stream1, 0, 0)
client2 := client.NewClient(stream2, 0, 0)

err := client1.Start()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion zk/datastream/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const dataStreamUrl = "datastream.cardona.zkevm-rpc.com:6900"
// This code downloads headers and blocks from a datastream server.
func main() {
// Create client
c := client.NewClient(dataStreamUrl, 0)
c := client.NewClient(dataStreamUrl, 0, 0)

// Start client (connect to the server)
defer c.Stop()
Expand Down
1 change: 0 additions & 1 deletion zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ func SpawnSequencingStage(
Number: new(big.Int).SetUint64(nextBlockNum),
GasLimit: getGasLimit(uint16(forkId)),
Time: newBlockTimestamp,
BaseFee: big.NewInt(0),
}

stateReader := state.NewPlainStateReader(tx)
Expand Down

0 comments on commit 05b0c9f

Please sign in to comment.