Skip to content

Commit

Permalink
fix(stage_baches): rpc resequence stop stage on unwind (#1297)
Browse files Browse the repository at this point in the history
* fix(stage_baches): rpc resequence stop stage on unwind

* fix: tests

* fix: datastream channel write blocking

* fix: datastream blocking test

* fix: add wait on the datastream connect loop

* fix: merge problems

* fix: blockhash comparison in stage batches processor

* fix: download entries till reaching the amount in header

* fix: add go sum package

* feat: internal reconnect on each method in datastream client

* fix: do not disconnect on stage batches end

* feat: add ctx close in datastream reconnections

* fix: send stop command after normal stop of reading

* feat: retry a fixed number of times in stage batches

* fix: return error on ctx done

* fix: reverse daastream server version

* feat: print ds found block

* feat: added more logs in stage batches

* fix: check for sync limit in stage batches

* fix: sync limit in stage  batches

* refactor: make unwind test erros a bit more readable

* refactor: make unwind tests erorrs more readable

* refactor(ds_client): wrap connection read and write to set timeout

* fix: add timeout to test clients

* fix: stage batches limit

* feat: up datastream server version

* fix: up datastream server version

* fix: go sum

* fix: add error handling for set timeouts in datastream client

* fix: handle zero checkTImeout value

* fix: remove flag setting for datastream timeout

* fix: ci config

* fix: resequence test timeout

* fix: remove timeout from pre-london ci config

* refactor: error handling

* fix: stop stage on unwind

* fix: missing id in client

* fix: tests

* fix: tests

* fix: finish processing blocks on last entry reached

* feat: send stop command at start of new cycle to not get timedout by server

* fix: remove accidental commit folder

* fix: remove unneeded commit

* fix: tests

* fix: remove unnneeded return

* fix: get correct parent block hash

* fix: read correct blockhash

* fix: unwind on ds block unwind

* refactor: error handling in datastream and stage batches

* fix: remove unneeded sleep

* fix: add a small sleep interval in the entry loop

* fix: stop streaming on querying new stuff from ds client

* fix: buffer clear before new reads

* fix: sleep more in resequence test

* fix: cast call

* fix: remove wrong flag on cast

* fix: cast json flags in test

* feat: added wait time for block to be available on sync node

* fix: resequence block check test

* Fix 'client already started' error on finding common ancestor

* Add timeout

---------

Co-authored-by: Scott Fairclough <70711990+hexoscott@users.noreply.github.com>
Co-authored-by: Jerry <jerrycgh@gmail.com>
  • Loading branch information
3 people authored Nov 7, 2024
1 parent 050949a commit 6e8775f
Show file tree
Hide file tree
Showing 19 changed files with 815 additions and 554 deletions.
37 changes: 37 additions & 0 deletions .github/scripts/test_resequence.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ wait_for_l1_batch() {
fi

if [ "$batch_type" = "virtual" ]; then

current_batch=$(cast logs --rpc-url "$(kurtosis port print cdk-v1 el-1-geth-lighthouse rpc)" --address 0x1Fe038B54aeBf558638CA51C91bC8cCa06609e91 --from-block 0 --json | jq -r '.[] | select(.topics[0] == "0x3e54d0825ed78523037d00a81759237eb436ce774bd546993ee67a1b67b6e766") | .topics[1]' | tail -n 1 | sed 's/^0x//')
current_batch=$((16#$current_batch))
elif [ "$batch_type" = "verified" ]; then
Expand All @@ -70,6 +71,33 @@ wait_for_l1_batch() {
done
}


wait_for_l2_block_number() {
local block_number=$1
local node_url=$2
local latest_block=0
local tries=0

#while latest_block lower than block_number
#if more than 5 attempts - throw error
while [ "$latest_block" -lt "$block_number" ]; do
latest_block=$(cast block latest --rpc-url "$node_url" | grep "number" | awk '{print $2}')
if [[ $? -ne 0 ]]; then
echo "Error: Failed to get latest block number" >&2
return 1
fi

if [ "$tries" -ge 5 ]; then
echo "Error: Failed to get block number $block_number" >&2
return 1
fi
tries=$((tries + 1))

echo "Current block number on $node_url: $latest_block, needed: $block_number. Waiting to try again."
sleep 60
done
}

stop_cdk_erigon_sequencer() {
echo "Stopping cdk-erigon"
kurtosis service exec cdk-v1 cdk-erigon-sequencer-001 "pkill -SIGTRAP proc-runner.sh" || true
Expand Down Expand Up @@ -139,9 +167,18 @@ echo "Calculating comparison block number"
comparison_block=$((latest_block - 10))
echo "Block number to compare (10 blocks behind): $comparison_block"

echo "Waiting some time for the syncer to catch up"
sleep 30

echo "Getting block hash from sequencer"
sequencer_hash=$(cast block $comparison_block --rpc-url "$(kurtosis port print cdk-v1 cdk-erigon-sequencer-001 rpc)" | grep "hash" | awk '{print $2}')

# wait for block to be available on sync node
if ! wait_for_l2_block_number $comparison_block "$(kurtosis port print cdk-v1 cdk-erigon-node-001 rpc)"; then
echo "Failed to wait for batch verification"
exit 1
fi

echo "Getting block hash from node"
node_hash=$(cast block $comparison_block --rpc-url "$(kurtosis port print cdk-v1 cdk-erigon-node-001 rpc)" | grep "hash" | awk '{print $2}')

Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/ci_zkevm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ jobs:
sed -i '/zkevm.sequencer-batch-seal-time:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.sequencer-non-empty-batch-seal-time:/d' templates/cdk-erigon/config.yml
sed -i '/sentry.drop-useless-peers:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.l2-datastreamer-timeout:/d' templates/cdk-erigon/config.yml
- name: Configure Kurtosis CDK
working-directory: ./kurtosis-cdk
run: |
/usr/local/bin/yq -i '.args.data_availability_mode = "${{ matrix.da-mode }}"' params.yml
/usr/local/bin/yq -i '.args.cdk_erigon_node_image = "cdk-erigon:local"' params.yml
- name: Deploy Kurtosis CDK package
working-directory: ./kurtosis-cdk
Expand Down Expand Up @@ -224,6 +230,8 @@ jobs:
sed -i '/sentry.drop-useless-peers:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm\.pool-manager-url/d' ./templates/cdk-erigon/config.yml
sed -i '$a\zkevm.disable-virtual-counters: true' ./templates/cdk-erigon/config.yml
sed -i '/zkevm.l2-datastreamer-timeout:/d' templates/cdk-erigon/config.yml
- name: Configure Kurtosis CDK
working-directory: ./kurtosis-cdk
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-resequence.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
sed -i '/zkevm.sequencer-non-empty-batch-seal-time:/d' templates/cdk-erigon/config.yml
sed -i '/sentry.drop-useless-peers:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.pool-manager-url/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.l2-datastreamer-timeout:/d' templates/cdk-erigon/config.yml
- name: Configure Kurtosis CDK
working-directory: ./kurtosis-cdk
run: |
Expand Down
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ var (
L2DataStreamerTimeout = cli.StringFlag{
Name: "zkevm.l2-datastreamer-timeout",
Usage: "The time to wait for data to arrive from the stream before reporting an error (0s doesn't check)",
Value: "0s",
Value: "3s",
}
L1SyncStartBlock = cli.Uint64Flag{
Name: "zkevm.l1-sync-start-block",
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ replace github.com/ledgerwatch/erigon-lib => ./erigon-lib

require (
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.5
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.7
github.com/99designs/gqlgen v0.17.40
github.com/Giulio2002/bls v0.0.0-20240315151443-652e18a3d188
github.com/Masterminds/sprig/v3 v3.2.3
Expand Down Expand Up @@ -62,7 +62,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/hashicorp/golang-lru/arc/v2 v2.0.6
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/holiman/uint256 v1.2.4
github.com/holiman/uint256 v1.3.1
github.com/huandu/xstrings v1.4.0
github.com/huin/goupnp v1.2.0
github.com/iden3/go-iden3-crypto v0.0.15
Expand Down Expand Up @@ -110,11 +110,11 @@ require (
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b
golang.org/x/net v0.24.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.19.0
golang.org/x/sys v0.20.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.63.2
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0
google.golang.org/protobuf v1.33.0
google.golang.org/protobuf v1.34.2
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -174,6 +174,7 @@ require (
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c // indirect
github.com/go-delve/delve v1.21.2 // indirect
github.com/go-llsqlite/adapter v0.0.0-20230927005056-7f5ce7f0c916 // indirect
github.com/go-llsqlite/crawshaw v0.4.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c/go.mod h1:WvSX4JsCRB
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.5 h1:p0epAhai44c34G+nzX0CZ67q3vkJtOXlO07lbhAEe9g=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.5/go.mod h1:RC6ouyNsUtJrv5aGPcM6Dm5xhXN209tRSzcsJsaOtZI=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6 h1:BSO1uu6dmLQ5kKb3uyDvsUxbnIoyumKvlwr0OtpTYMo=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6/go.mod h1:RC6ouyNsUtJrv5aGPcM6Dm5xhXN209tRSzcsJsaOtZI=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.7 h1:73sYxRQ9cOmtYBEyHePgEwrVULR+YruSQxVXCt/SmzU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.7/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/99designs/gqlgen v0.17.40 h1:/l8JcEVQ93wqIfmH9VS1jsAkwm6eAF1NwQn3N+SDqBY=
github.com/99designs/gqlgen v0.17.40/go.mod h1:b62q1USk82GYIVjC60h02YguAZLqYZtvWml8KkhJps4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down Expand Up @@ -327,6 +331,8 @@ github.com/go-chi/chi/v5 v5.0.12 h1:9euLV5sTrTNTRUU9POmDUvfxyj6LAABLUcEWO+JJb4s=
github.com/go-chi/chi/v5 v5.0.12/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-delve/delve v1.21.2 h1:eaS+ziJo+660mi3D2q/VP8RxW5GcF4Y1zyKSi82alsU=
github.com/go-delve/delve v1.21.2/go.mod h1:FgTAiRUe43RS5EexL06RPyMtP8AMZVL/t9Qqgy3qUe4=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down Expand Up @@ -490,6 +496,8 @@ github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSo
github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/holiman/uint256 v1.3.1 h1:JfTzmih28bittyHM8z360dCjIA9dbPIBlcTI6lmctQs=
github.com/holiman/uint256 v1.3.1/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huandu/xstrings v1.0.0/go.mod h1:4qWG/gcEcfX4z/mBDHJ++3ReCw9ibxbsNJbcucJdbSo=
github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4=
Expand Down Expand Up @@ -1333,6 +1341,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down Expand Up @@ -1547,6 +1557,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
Expand Down
52 changes: 18 additions & 34 deletions zk/datastream/client/commands.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
package client

import "fmt"

const (
// Commands
CmdUnknown Command = 0
CmdStart Command = 1
CmdStop Command = 2
CmdHeader Command = 3
CmdStartBookmark Command = 4 // CmdStartBookmark for the start from bookmark TCP client command
CmdEntry Command = 5 // CmdEntry for the get entry TCP client command
CmdBookmark Command = 6 // CmdBookmark for the get bookmark TCP client command
CmdUnknown Command = iota
CmdStart
CmdStop
CmdHeader
CmdStartBookmark // CmdStartBookmark for the start from bookmark TCP client command
CmdEntry // CmdEntry for the get entry TCP client command
CmdBookmark // CmdBookmark for the get bookmark TCP client command
)

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendHeaderCmd() error {
err := c.sendCommand(CmdHeader)
if err != nil {
return fmt.Errorf("%s %v", c.id, err)
}

return nil
return c.sendCommand(CmdHeader)
}

// sendBookmarkCmd sends either CmdStartBookmark or CmdBookmark for the provided bookmark value.
Expand All @@ -38,24 +31,23 @@ func (c *StreamClient) sendBookmarkCmd(bookmark []byte, streaming bool) error {
}

// Send bookmark length
if err := writeFullUint32ToConn(c.conn, uint32(len(bookmark))); err != nil {
if err := c.writeToConn(uint32(len(bookmark))); err != nil {
return err
}

// Send the bookmark to retrieve
return writeBytesToConn(c.conn, bookmark)
return c.writeToConn(bookmark)
}

// sendStartCmd sends a start command to the server, indicating
// that the client wishes to start streaming from the given entry number.
func (c *StreamClient) sendStartCmd(from uint64) error {
err := c.sendCommand(CmdStart)
if err != nil {
if err := c.sendCommand(CmdStart); err != nil {
return err
}

// Send starting/from entry number
return writeFullUint64ToConn(c.conn, from)
return c.writeToConn(from)
}

// sendEntryCmd sends the get data stream entry by number command to a TCP connection
Expand All @@ -66,29 +58,21 @@ func (c *StreamClient) sendEntryCmd(entryNum uint64) error {
}

// Send entry number
return writeFullUint64ToConn(c.conn, entryNum)
return c.writeToConn(entryNum)
}

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendStopCmd() error {
err := c.sendCommand(CmdStop)
if err != nil {
return fmt.Errorf("%s %v", c.id, err)
}

return nil
return c.sendCommand(CmdStop)
}

func (c *StreamClient) sendCommand(cmd Command) error {

// Send command
if err := writeFullUint64ToConn(c.conn, uint64(cmd)); err != nil {
return fmt.Errorf("%s %v", c.id, err)
if err := c.writeToConn(uint64(cmd)); err != nil {
return err
}

// Send stream type
if err := writeFullUint64ToConn(c.conn, uint64(c.streamType)); err != nil {
return fmt.Errorf("%s %v", c.id, err)
}

return nil
return c.writeToConn(uint64(c.streamType))
}
Loading

0 comments on commit 6e8775f

Please sign in to comment.