Modular synchronizer #2065
Replies: 1 comment
-
Reggarding how to perform the insert atomics, this can be achieved at least using two techniques from Postgres: Solution using isolationPROS:
CONS:
Solution using explicit lockingas said in the doc:
PROS:
CONS:
ExampleGiven the following test cases: Case 1:Trusted sync init tx Expected:
Case 2:Trusted sync init tx Expected:
package state
import (
"context"
"errors"
"math/big"
"sync"
"testing"
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/test/dbutils"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/stretchr/testify/require"
)
func TestConcurrentSync(t *testing.T) {
// Setup
stateDBCfg := dbutils.NewStateConfigFromEnv()
if err := dbutils.InitOrResetState(stateDBCfg); err != nil {
panic(err)
}
stateDB, err := db.NewSQLDB(stateDBCfg)
state := NewPostgresStorage(stateDB)
require.NoError(t, err)
defer stateDB.Close()
ctx := context.Background()
setupDBTx, err := stateDB.Begin(ctx)
require.NoError(t, err)
// Insert N trusted batches
nTrusteBatches := 5
for i := 1; i <= nTrusteBatches; i++ {
require.NoError(t, state.openBatch(ctx, ProcessingContext{BatchNumber: uint64(i)}, setupDBTx))
require.NoError(t, state.closeBatch(ctx, ProcessingReceipt{
BatchNumber: uint64(i),
StateRoot: common.BigToHash(big.NewInt(int64(i))),
}, setupDBTx))
}
require.NoError(t, setupDBTx.Commit(ctx))
// Using isolation
// Case 1
trustedDBTx, l1DBTx := doDBInteractionsUsingIsolation(t, nTrusteBatches, state, stateDB)
expectedLastBatch := uint64(nTrusteBatches - 1)
// L1 sync commits first
assertsIsolation(t, expectedLastBatch, l1DBTx, trustedDBTx, state, stateDB)
nTrusteBatches = int(expectedLastBatch)
// Case 2
trustedDBTx, l1DBTx = doDBInteractionsUsingIsolation(t, nTrusteBatches, state, stateDB)
expectedLastBatch = uint64(nTrusteBatches + 1)
// Trusted sync commits first
assertsIsolation(t, expectedLastBatch, trustedDBTx, l1DBTx, state, stateDB)
nTrusteBatches = int(expectedLastBatch)
// Using isolation
// Case 1
var wg sync.WaitGroup
trustedDBTx, l1DBTx = doDBInteractionsUsingLocks(t, nTrusteBatches, true, state, stateDB, &wg)
expectedLastBatch = uint64(nTrusteBatches - 1)
// L1 sync commits first
assertsLocks(t, expectedLastBatch, l1DBTx, trustedDBTx, state, stateDB, &wg)
nTrusteBatches = int(expectedLastBatch)
// Case 2
trustedDBTx, l1DBTx = doDBInteractionsUsingLocks(t, nTrusteBatches, false, state, stateDB, &wg)
expectedLastBatch = uint64(nTrusteBatches - 1)
// Trusted sync commits first
assertsLocks(t, expectedLastBatch, trustedDBTx, l1DBTx, state, stateDB, &wg)
}
func doDBInteractionsUsingIsolation(t *testing.T, nTrusteBatches int, state *PostgresStorage, stateDB *pgxpool.Pool) (trustedDBTx, l1DBTx pgx.Tx) {
ctx := context.Background()
var err error
trustedDBTx, err = stateDB.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.Serializable})
require.NoError(t, err)
l1DBTx, err = stateDB.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.Serializable})
require.NoError(t, err)
// Trusted sync interactions
var actualRoot string
err = trustedDBTx.QueryRow(ctx, "SELECT state_root FROM state.batch ORDER BY batch_num DESC LIMIT 1").Scan(&actualRoot)
require.NoError(t, err)
require.Equal(t, common.BigToHash(big.NewInt(int64(nTrusteBatches))).Hex(), actualRoot)
require.NoError(t, state.openBatch(ctx, ProcessingContext{BatchNumber: uint64(nTrusteBatches + 1)}, trustedDBTx))
require.NoError(t, state.closeBatch(ctx, ProcessingReceipt{BatchNumber: uint64(nTrusteBatches + 1)}, trustedDBTx))
// L1 sync interactions
const resetSQL = "DELETE FROM state.batch WHERE batch_num > $1"
_, err = l1DBTx.Exec(ctx, resetSQL, nTrusteBatches-1)
require.NoError(t, err)
return
}
func assertsIsolation(t *testing.T, expectedLastBatchNum uint64, firstCommiter, secondCommiter pgx.Tx, state *PostgresStorage, stateDB *pgxpool.Pool) {
ctx := context.Background()
require.NoError(t, firstCommiter.Commit(ctx))
// https://github.com/jackc/pgx/wiki/Error-Handling
err := secondCommiter.Commit(ctx)
require.NotNil(t, err)
var pgErr *pgconn.PgError
require.True(t, errors.As(err, &pgErr))
require.Equal(t, "40001", pgErr.Code)
bn, err := state.GetLastBatchNumber(ctx, nil)
require.NoError(t, err)
require.Equal(t, expectedLastBatchNum, bn)
}
func doDBInteractionsUsingLocks(t *testing.T, nTrusteBatches int, l1GoesFirst bool, state *PostgresStorage, stateDB *pgxpool.Pool, wg *sync.WaitGroup) (trustedDBTx, l1DBTx pgx.Tx) {
wg.Add(2)
log.Debug("INIT doDBInteractionsUsingLocks")
ctx := context.Background()
var err error
trustedDBTx, err = stateDB.Begin(ctx)
require.NoError(t, err)
l1DBTx, err = stateDB.Begin(ctx)
require.NoError(t, err)
// Lock
const lockQuery = "SELECT batch_num FROM state.batch ORDER BY batch_num DESC LIMIT 1 FOR UPDATE"
trustedInteractions := func() {
// Trusted sync interactions
var actualBatchNum int
require.NoError(t, trustedDBTx.QueryRow(ctx, lockQuery).Scan(&actualBatchNum))
log.Warnf("trustedInteractions reads actualBatchNum: %d", actualBatchNum)
if actualBatchNum == nTrusteBatches {
log.Warn("Trusted Interactions executing")
// L1 Sync has not reorged yet, let's insert next trusted batch
require.NoError(t, state.openBatch(ctx, ProcessingContext{BatchNumber: uint64(nTrusteBatches + 1)}, trustedDBTx))
require.NoError(t, state.closeBatch(ctx, ProcessingReceipt{BatchNumber: uint64(nTrusteBatches + 1)}, trustedDBTx))
log.Warn("Trusted Interactions DONE executing")
}
wg.Done()
}
l1Interactions := func() {
// L1 sync interactions
var actualBatchNum int
require.NoError(t, l1DBTx.QueryRow(ctx, lockQuery).Scan(&actualBatchNum))
log.Warnf("l1Interactions reads actualBatchNum: %d", actualBatchNum)
log.Warn("L1 Interactions executing")
/*
NOTE: If the trusted sync locks first, actualBatchNum will still be nTrusteBatches
instead of nTrusteBatches + 1. This is because nTrusteBatches doesn't get modified.
However this is not a problem in reality, on the countrary, this is the desired behaviour:
1. Trusted sync adds the next batch atomically
2. L1 Sync waits until Trusted sync is done
3. Trusted sync finishes inserting nTrusteBatches + 1
4. L1 Sync gets unlocked and deletes nTrusteBatches and nTrusteBatches + 1
This is not how the test using isolation behaves, as the test finish when the error gets detected, but right after that
the L1 Sync should re-try the reorg query and achieve the same result
*/
const resetSQL = "DELETE FROM state.batch WHERE batch_num > $1"
_, err = l1DBTx.Exec(ctx, resetSQL, nTrusteBatches-1)
require.NoError(t, err)
log.Warn("L1 Interactions DONE executing")
wg.Done()
}
if l1GoesFirst {
l1Interactions()
go trustedInteractions()
} else {
trustedInteractions()
go l1Interactions()
}
return
}
func assertsLocks(t *testing.T, expectedLastBatchNum uint64, firstCommiter, secondCommiter pgx.Tx, state *PostgresStorage, stateDB *pgxpool.Pool, wg *sync.WaitGroup) {
ctx := context.Background()
require.NoError(t, firstCommiter.Commit(ctx))
wg.Wait()
require.NoError(t, secondCommiter.Commit(ctx))
bn, err := state.GetLastBatchNumber(ctx, nil)
require.NoError(t, err)
require.Equal(t, expectedLastBatchNum, bn)
} |
Beta Was this translation helpful? Give feedback.
-
This discussion suggest changes to be done in the synchronizer component for it to become more flexible, and enable alt implementations and/or different functionalities
Split by source
The first design principle is split the synchronizer into many synchronizers, according to the source (where the data is synchronized from).
The
cmd
should then spin up different synchronizers, each synchronizer should be autonomus, in a way that it shouldn't need to interact directly with other synchronizers. On the other hand this coordination among synchronizers can happen through DB as it already happens with other componentsThe current synchronizer should be splitted in the following synchronizers
Genesis
Split the logic to create the genesis state. Note that this is not a synchronizer as it doesn't download any data from an external source. Instead the data is provided manually by the node operator. However the result of building the genesis is contrasted with L1 as a sanity check
It could be interesting to consider moving this logic to some other package although it's not necessary. Keep in mind that there is genesis logic spread through some packages:
config/network.go
: load genesis configuration either from file or pre-defined networks (hardcoded)etherman/etherman.go:VerifyGenBlockNumber
: check the block in which SCs where deployedstate/genesis.go
: type definitions related to genesisstate/state.go:SetGenesis
: function to actually set leafs into the merkletreesynchronizer/synchronizer.go:Sync
before starting the main loop, it checks if the genesis is already added into the state. If it is not, it creates the genesisProposal:
a) Implement this as part of the L1 Sync (as it is right now)
b) Create a separated
genesis
package to encapsulate all this logic (except theconfig
).cmd
should call this before starting the actual synchronizersL1 Sync (
synchronizer/l1sync
)Consume events and does polling to read the latest changes on the L1 SCs, it captures the following information:
This part of the synchronizer is higly coupled with the
etherman
, the downside is that any changes on the L1 SCs can break this synchronizer, not to mention an escenario where many contracts are used to accomodate different networks. Therefore it's needed to build some interfaces to make this synchronizer flexible. The idea is to have a generic L1 synchronizer that it's agnostic to the events and the functions to poll latest changes, instead this synchronizers will take care of calling this two types of functions, in a pattern of handler registrationTrusted Sequencer JSON RPC sync (
synchronizer/trustedsync
)Does polling on the JSON RPC of the trusted sequencer in order to build the trusted state
This synchronizer can be understood as the client side of the trusted sequencer server, and it needs to be developed accordingly. Therefore here the only change required is to actually decouple this from the monolithic synchronizer, and additionally add some coordination mechanisms:
l1sync
, thetrustedsync
needs to be aware of the latest batch seen on Ethereum vs the last synchronized. Iflast batch on DB < last batch on Ethereum
thetrsutedsync
will be haltedl1sync
has discarded some batches before proceeding with the insert, in other words, only insert if the last state root used to call the executor is still the same when insertingBeta Was this translation helpful? Give feedback.
All reactions