Skip to content

Commit

Permalink
add im state root to data stream (#2893)
Browse files Browse the repository at this point in the history
* add im state root to data stream

* fix test

* remove log

* optimization

* fix config

* init map

* add wg

* add logs

* add logs

* add logs

* add logs

* add logs

* add logs

* fix
  • Loading branch information
ToniRamirezM authored Dec 11, 2023
1 parent dcedd80 commit e355a41
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 28 deletions.
13 changes: 13 additions & 0 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ func (d *dbManager) sendDataToStreamer() {
}

for _, l2Transaction := range l2Transactions {
// Populate intermediate state root
position := state.GetSystemSCPosition(blockStart.L2BlockNumber)
imStateRoot, err := d.GetStorageAt(context.Background(), common.HexToAddress(state.SystemSC), big.NewInt(0).SetBytes(position), l2Block.StateRoot)
if err != nil {
log.Errorf("failed to get storage at for l2block %v: %v", l2Block.L2BlockNumber, err)
}
l2Transaction.StateRoot = common.BigToHash(imStateRoot)

_, err = d.streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transaction.Encode())
if err != nil {
log.Errorf("failed to add l2tx stream entry for l2block %v: %v", l2Block.L2BlockNumber, err)
Expand Down Expand Up @@ -726,3 +734,8 @@ func (d *dbManager) GetForcedBatch(ctx context.Context, forcedBatchNumber uint64
func (d *dbManager) GetForkIDByBatchNumber(batchNumber uint64) uint64 {
return d.state.GetForkIDByBatchNumber(batchNumber)
}

// GetStorageAt returns the storage at a given address and position
func (d *dbManager) GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error) {
return d.state.GetStorageAt(ctx, address, position, root)
}
2 changes: 2 additions & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type stateInterface interface {
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*state.DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error)
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
}

type workerInterface interface {
Expand Down Expand Up @@ -137,4 +138,5 @@ type dbManagerInterface interface {
StoreProcessedTxAndDeleteFromPool(ctx context.Context, tx transactionToStore) error
GetForcedBatch(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) (*state.ForcedBatch, error)
GetForkIDByBatchNumber(batchNumber uint64) uint64
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
}
26 changes: 26 additions & 0 deletions sequencer/mock_db_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions sequencer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *Sequencer) Start(ctx context.Context) {
}

func (s *Sequencer) updateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer) {
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state, true)
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state, true, nil)
if err != nil {
log.Fatalf("failed to generate data streamer file, err: %v", err)
}
Expand Down
56 changes: 47 additions & 9 deletions state/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package state
import (
"context"
"encoding/binary"
"math/big"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/iden3/go-iden3-crypto/keccak256"
"github.com/jackc/pgx/v4"
)

Expand All @@ -25,6 +28,10 @@ const (
EntryTypeUpdateGER datastreamer.EntryType = 4
// BookMarkTypeL2Block represents a L2 block bookmark
BookMarkTypeL2Block byte = 0
// SystemSC is the system smart contract address
SystemSC = "0x000000000000000000000000000000005ca1ab1e"
// posConstant is the constant used to compute the position of the intermediate state root
posConstant = 1
)

// DSBatch represents a data stream batch
Expand Down Expand Up @@ -92,10 +99,11 @@ func (b DSL2BlockStart) Decode(data []byte) DSL2BlockStart {

// DSL2Transaction represents a data stream L2 transaction
type DSL2Transaction struct {
L2BlockNumber uint64 // Not included in the encoded data
EffectiveGasPricePercentage uint8 // 1 byte
IsValid uint8 // 1 byte
EncodedLength uint32 // 4 bytes
L2BlockNumber uint64 // Not included in the encoded data
EffectiveGasPricePercentage uint8 // 1 byte
IsValid uint8 // 1 byte
StateRoot common.Hash // 32 bytes
EncodedLength uint32 // 4 bytes
Encoded []byte
}

Expand All @@ -104,6 +112,7 @@ func (l DSL2Transaction) Encode() []byte {
bytes := make([]byte, 0)
bytes = append(bytes, byte(l.EffectiveGasPricePercentage))
bytes = append(bytes, byte(l.IsValid))
bytes = append(bytes, l.StateRoot[:]...)
bytes = binary.LittleEndian.AppendUint32(bytes, l.EncodedLength)
bytes = append(bytes, l.Encoded...)
return bytes
Expand All @@ -113,8 +122,9 @@ func (l DSL2Transaction) Encode() []byte {
func (l DSL2Transaction) Decode(data []byte) DSL2Transaction {
l.EffectiveGasPricePercentage = uint8(data[0])
l.IsValid = uint8(data[1])
l.EncodedLength = binary.LittleEndian.Uint32(data[2:6])
l.Encoded = data[6:]
l.StateRoot = common.BytesToHash(data[2:34])
l.EncodedLength = binary.LittleEndian.Uint32(data[34:38])
l.Encoded = data[38:]
return l
}

Expand Down Expand Up @@ -202,10 +212,12 @@ type DSState interface {
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*DSL2Transaction, error)
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error)
}

// GenerateDataStreamerFile generates or resumes a data stream file
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool) error {
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool, imStateRoots *map[uint64][]byte) error {
header := streamServer.GetHeader()

var currentBatchNumber uint64 = 0
Expand Down Expand Up @@ -345,8 +357,6 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St

// Gererate full batches
fullBatches := computeFullBatches(batches, l2Blocks, l2Txs)
log.Debugf("Full batches: %+v", fullBatches)

currentBatchNumber += limit

for _, batch := range fullBatches {
Expand Down Expand Up @@ -418,6 +428,18 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
}

for _, tx := range l2block.Txs {
// Populate intermediate state root
if imStateRoots == nil || (*imStateRoots)[blockStart.L2BlockNumber] == nil {
position := GetSystemSCPosition(l2block.L2BlockNumber)
imStateRoot, err := stateDB.GetStorageAt(ctx, common.HexToAddress(SystemSC), big.NewInt(0).SetBytes(position), l2block.StateRoot)
if err != nil {
return err
}
tx.StateRoot = common.BigToHash(imStateRoot)
} else {
tx.StateRoot = common.BytesToHash((*imStateRoots)[blockStart.L2BlockNumber])
}

entry, err = streamServer.AddStreamEntry(EntryTypeL2Tx, tx.Encode())
if err != nil {
return err
Expand Down Expand Up @@ -447,6 +469,22 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
return err
}

// GetSystemSCPosition computes the position of the intermediate state root for the system smart contract
func GetSystemSCPosition(blockNumber uint64) []byte {
v1 := big.NewInt(0).SetUint64(blockNumber).Bytes()
v2 := big.NewInt(0).SetUint64(uint64(posConstant)).Bytes()

// Add 0s to make v1 and v2 32 bytes long
for len(v1) < 32 {
v1 = append([]byte{0}, v1...)
}
for len(v2) < 32 {
v2 = append([]byte{0}, v2...)
}

return keccak256.Hash(v1, v2)
}

// computeFullBatches computes the full batches
func computeFullBatches(batches []*DSBatch, l2Blocks []*DSL2Block, l2Txs []*DSL2Transaction) []*DSFullBatch {
currentL2Block := 0
Expand Down
25 changes: 20 additions & 5 deletions state/test/datastream_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package test

import (
"fmt"
"testing"
"time"

"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -28,14 +30,15 @@ func TestL2BlockStartEncode(t *testing.T) {

func TestL2TransactionEncode(t *testing.T) {
l2Transaction := state.DSL2Transaction{
EffectiveGasPricePercentage: 128, // 1 byte
IsValid: 1, // 1 byte
EncodedLength: 5, // 4 bytes
Encoded: []byte{1, 2, 3, 4, 5}, // 5 bytes
EffectiveGasPricePercentage: 128, // 1 byte
IsValid: 1, // 1 byte
StateRoot: common.HexToHash("0x010203"), // 32 bytes
EncodedLength: 5, // 4 bytes
Encoded: []byte{1, 2, 3, 4, 5}, // 5 bytes
}

encoded := l2Transaction.Encode()
expected := []byte{128, 1, 5, 0, 0, 0, 1, 2, 3, 4, 5}
expected := []byte{128, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 5, 0, 0, 0, 1, 2, 3, 4, 5}
assert.Equal(t, expected, encoded)
}

Expand All @@ -53,3 +56,15 @@ func TestL2BlockEndEncode(t *testing.T) {

assert.Equal(t, expected, encoded)
}

func TestCalculateSCPosition(t *testing.T) {
a := time.Now()
blockNumber := uint64(2934867)
expected := common.HexToHash("0xaa93c484856be45716623765b429a967296594ca362e61e91d671fb422e0f744")
position := state.GetSystemSCPosition(blockNumber)
assert.Equal(t, expected, common.BytesToHash(position))
b := time.Now()

c := b.Sub(a)
fmt.Println(c)
}
21 changes: 13 additions & 8 deletions tools/datastreamer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/log"
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/merkletree"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
Expand All @@ -28,15 +27,21 @@ type OnlineConfig struct {
StreamType datastreamer.StreamType `mapstructure:"StreamType"`
}

// MTConfig is the configuration for the merkle tree
type MTConfig struct {
URI string `mapstructure:"URI"`
MaxThreads int `mapstructure:"MaxThreads"`
}

// Config is the configuration for the tool
type Config struct {
ChainID uint64 `mapstructure:"ChainID"`
Online OnlineConfig `mapstructure:"Online"`
Offline datastreamer.Config `mapstructure:"Offline"`
StateDB db.Config `mapstructure:"StateDB"`
Executor executor.Config `mapstructure:"Executor"`
MerkeTree merkletree.Config `mapstructure:"MerkeTree"`
Log log.Config `mapstructure:"Log"`
ChainID uint64 `mapstructure:"ChainID"`
Online OnlineConfig `mapstructure:"Online"`
Offline datastreamer.Config `mapstructure:"Offline"`
StateDB db.Config `mapstructure:"StateDB"`
Executor executor.Config `mapstructure:"Executor"`
MerkleTree MTConfig `mapstructure:"MerkleTree"`
Log log.Config `mapstructure:"Log"`
}

// Default parses the default configuration values.
Expand Down
3 changes: 2 additions & 1 deletion tools/datastreamer/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ MaxConns = 200
URI = "zkevm-prover:50071"
MaxGRPCMessageSize = 100000000
[MerkeTree]
[MerkleTree]
URI = "zkevm-prover:50061"
MaxThreads = 20
[Log]
Environment = "development" # "production" or "development"
Expand Down
3 changes: 2 additions & 1 deletion tools/datastreamer/config/tool.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ MaxConns = 200
URI = "zkevm-prover:50071"
MaxGRPCMessageSize = 100000000

[MerkeTree]
[MerkleTree]
URI = "zkevm-prover:50061"
MaxThreads = 20

[Log]
Environment = "development"
Expand Down
Loading

0 comments on commit e355a41

Please sign in to comment.