From 8d67cb9d539b9992abfb38d31bf1b3d237318a66 Mon Sep 17 00:00:00 2001 From: agnusmor <100322135+agnusmor@users.noreply.github.com> Date: Mon, 12 Aug 2024 18:56:31 +0200 Subject: [PATCH] Fix tx OOC (node level) when first empty L2 block in batch (#3744) (#3764) * Fix tx OOC (node level) for first empty L2 block in batch * change log level for ooc (node level) when adding tx to the worker * fix check OOC (node level) when preexecuting the tx in RPC * Fix linter and test --- pool/config_test.go | 23 +++++++++++------- pool/pool.go | 17 ++++++++------ sequencer/finalizer.go | 27 +++++++++++++++++---- sequencer/interfaces.go | 2 +- sequencer/l2block.go | 2 +- sequencer/mock_worker.go | 33 ++++++++++++++++---------- sequencer/worker.go | 35 +++++++++++++++++++-------- sequencer/worker_test.go | 2 +- state/config.go | 51 +++++++++++++++++++++++++++++++--------- 9 files changed, 135 insertions(+), 57 deletions(-) diff --git a/pool/config_test.go b/pool/config_test.go index 96b2cd3831..52ae398329 100644 --- a/pool/config_test.go +++ b/pool/config_test.go @@ -1,9 +1,11 @@ package pool import ( + "fmt" "testing" "github.com/0xPolygonHermez/zkevm-node/state" + "github.com/stretchr/testify/assert" ) func TestIsWithinConstraints(t *testing.T) { @@ -20,9 +22,9 @@ func TestIsWithinConstraints(t *testing.T) { } testCases := []struct { - desc string - counters state.ZKCounters - expected bool + desc string + counters state.ZKCounters + errExpected error }{ { desc: "All constraints within limits", @@ -37,7 +39,7 @@ func TestIsWithinConstraints(t *testing.T) { Steps: 2000, Sha256Hashes_V2: 4000, }, - expected: true, + errExpected: nil, }, { desc: "All constraints exceed limits", @@ -52,14 +54,17 @@ func TestIsWithinConstraints(t *testing.T) { Steps: 5000, Sha256Hashes_V2: 6000, }, - expected: false, + errExpected: fmt.Errorf("out of counters at node level (GasUsed, KeccakHashes, PoseidonHashes, PoseidonPaddings, MemAligns, Arithmetics, Binaries, Steps, Sha256Hashes)"), }, } - for _, tC := range testCases { - t.Run(tC.desc, func(t *testing.T) { - if got := cfg.IsWithinConstraints(tC.counters); got != tC.expected { - t.Errorf("Expected %v, got %v", tC.expected, got) + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + err := cfg.CheckNodeLevelOOC(tc.counters) + if tc.errExpected != nil { + assert.EqualError(t, err, tc.errExpected.Error()) + } else { + assert.NoError(t, err) } }) } diff --git a/pool/pool.go b/pool/pool.go index e8e45268c5..029ee14dbb 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -196,7 +196,16 @@ func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isW return err } + var oocError error if preExecutionResponse.OOCError != nil { + oocError = preExecutionResponse.OOCError + } else { + if err = p.batchConstraintsCfg.CheckNodeLevelOOC(preExecutionResponse.reservedZKCounters); err != nil { + oocError = err + } + } + + if oocError != nil { event := &event.Event{ ReceivedAt: time.Now(), IPAddress: ip, @@ -212,7 +221,7 @@ func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isW log.Errorf("error adding event: %v", err) } // Do not add tx to the pool - return fmt.Errorf("failed to add tx to the pool: %w", preExecutionResponse.OOCError) + return fmt.Errorf("failed to add tx to the pool: %w", oocError) } else if preExecutionResponse.OOGError != nil { event := &event.Event{ ReceivedAt: time.Now(), @@ -333,12 +342,6 @@ func (p *Pool) preExecuteTx(ctx context.Context, tx types.Transaction) (preExecu if errors.Is(errorToCheck, runtime.ErrOutOfGas) { response.OOGError = err } - } else { - if !p.batchConstraintsCfg.IsWithinConstraints(processBatchResponse.UsedZkCounters) { - err := fmt.Errorf("OutOfCounters Error (Node level) for tx: %s", tx.Hash().String()) - response.OOCError = err - log.Error(err.Error()) - } } response.usedZKCounters = processBatchResponse.UsedZkCounters diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index ba25a06ec9..eb8b748c0b 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -404,9 +404,26 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { f.finalizeWIPL2Block(ctx) } - tx, err := f.workerIntf.GetBestFittingTx(f.wipBatch.imRemainingResources, f.wipBatch.imHighReservedZKCounters) + tx, oocTxs, err := f.workerIntf.GetBestFittingTx(f.wipBatch.imRemainingResources, f.wipBatch.imHighReservedZKCounters, (f.wipBatch.countOfL2Blocks == 0 && f.wipL2Block.isEmpty())) - // If we have txs pending to process but none of them fits into the wip batch, we close the wip batch and open a new one + // Set as invalid txs in the worker pool that will never fit into an empty batch + for _, oocTx := range oocTxs { + log.Infof("tx %s doesn't fits in empty batch %d (node OOC), setting tx as invalid in the pool", oocTx.HashStr, f.wipL2Block.trackingNum, f.wipBatch.batchNumber) + + f.LogEvent(ctx, event.Level_Info, event.EventID_NodeOOC, + fmt.Sprintf("tx %s doesn't fits in empty batch %d (node OOC), from: %s, IP: %s", oocTx.HashStr, f.wipBatch.batchNumber, oocTx.FromStr, oocTx.IP), nil) + + // Delete the transaction from the worker + f.workerIntf.DeleteTx(oocTx.Hash, oocTx.From) + + errMsg := "node OOC" + err = f.poolIntf.UpdateTxStatus(ctx, oocTx.Hash, pool.TxStatusInvalid, false, &errMsg) + if err != nil { + log.Errorf("failed to update status to invalid in the pool for tx %s, error: %v", oocTx.Hash.String(), err) + } + } + + // We have txs pending to process but none of them fits into the wip batch we close the wip batch and open a new one if err == ErrNoFittingTransaction { f.finalizeWIPBatch(ctx, state.NoTxFitsClosingReason) continue @@ -690,11 +707,11 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx } else { log.Infof("current tx %s needed resources exceeds the remaining batch resources, overflow resource: %s, updating metadata for tx in worker and continuing. counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}", tx.HashStr, overflowResource, f.logZKCounters(f.wipBatch.imRemainingResources.ZKCounters), f.logZKCounters(result.UsedZkCounters), f.logZKCounters(result.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(f.wipBatch.imHighReservedZKCounters)) - if !f.batchConstraints.IsWithinConstraints(result.ReservedZkCounters) { - log.Infof("current tx %s reserved resources exceeds the max limit for batch resources (node OOC), setting tx as invalid in the pool", tx.HashStr) + if err := f.batchConstraints.CheckNodeLevelOOC(result.ReservedZkCounters); err != nil { + log.Infof("current tx %s reserved resources exceeds the max limit for batch resources (node OOC), setting tx as invalid in the pool, error: %v", tx.HashStr, err) f.LogEvent(ctx, event.Level_Info, event.EventID_NodeOOC, - fmt.Sprintf("tx %s exceeds node max limit batch resources (node OOC), from: %s, IP: %s", tx.HashStr, tx.FromStr, tx.IP), nil) + fmt.Sprintf("tx %s exceeds node max limit batch resources (node OOC), from: %s, IP: %s, error: %v", tx.HashStr, tx.FromStr, tx.IP, err), nil) // Delete the transaction from the txSorted list f.workerIntf.DeleteTx(tx.Hash, tx.From) diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index 72beb71578..173635f90b 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -84,7 +84,7 @@ type stateInterface interface { } type workerInterface interface { - GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters) (*TxTracker, error) + GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters, fistL2Block bool) (*TxTracker, []*TxTracker, error) UpdateAfterSingleSuccessfulTxExecution(from common.Address, touchedAddresses map[common.Address]*state.InfoReadWrite) []*TxTracker UpdateTxZKCounters(txHash common.Hash, from common.Address, usedZKCounters state.ZKCounters, reservedZKCounters state.ZKCounters) AddTxTracker(ctx context.Context, txTracker *TxTracker) (replacedTx *TxTracker, dropReason error) diff --git a/sequencer/l2block.go b/sequencer/l2block.go index 511d233988..a0da9fd0e3 100644 --- a/sequencer/l2block.go +++ b/sequencer/l2block.go @@ -657,7 +657,7 @@ func (f *finalizer) openNewWIPL2Block(ctx context.Context, prevTimestamp uint64, f.wipBatch.imHighReservedZKCounters = newHighZKCounters } else { - log.Infof("new wip L2 block [%d] reserved resources exceeds the remaining batch resources, overflow resource: %s, closing WIP batch and creating new one. counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}", + log.Infof("new wip L2 block [%d] needed resources exceeds the remaining batch resources, overflow resource: %s, closing WIP batch and creating new one. counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}", f.wipL2Block.trackingNum, overflowResource, f.logZKCounters(f.wipBatch.imRemainingResources.ZKCounters), f.logZKCounters(f.wipL2Block.usedZKCountersOnNew), f.logZKCounters(f.wipL2Block.reservedZKCountersOnNew), f.logZKCounters(neededZKCounters), f.logZKCounters(f.wipBatch.imHighReservedZKCounters)) } diff --git a/sequencer/mock_worker.go b/sequencer/mock_worker.go index 3ff546f724..a627bf5533 100644 --- a/sequencer/mock_worker.go +++ b/sequencer/mock_worker.go @@ -70,34 +70,43 @@ func (_m *WorkerMock) DeleteTxPendingToStore(txHash common.Hash, addr common.Add _m.Called(txHash, addr) } -// GetBestFittingTx provides a mock function with given fields: remainingResources, highReservedCounters -func (_m *WorkerMock) GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters) (*TxTracker, error) { - ret := _m.Called(remainingResources, highReservedCounters) +// GetBestFittingTx provides a mock function with given fields: remainingResources, highReservedCounters, fistL2Block +func (_m *WorkerMock) GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters, fistL2Block bool) (*TxTracker, []*TxTracker, error) { + ret := _m.Called(remainingResources, highReservedCounters, fistL2Block) if len(ret) == 0 { panic("no return value specified for GetBestFittingTx") } var r0 *TxTracker - var r1 error - if rf, ok := ret.Get(0).(func(state.BatchResources, state.ZKCounters) (*TxTracker, error)); ok { - return rf(remainingResources, highReservedCounters) + var r1 []*TxTracker + var r2 error + if rf, ok := ret.Get(0).(func(state.BatchResources, state.ZKCounters, bool) (*TxTracker, []*TxTracker, error)); ok { + return rf(remainingResources, highReservedCounters, fistL2Block) } - if rf, ok := ret.Get(0).(func(state.BatchResources, state.ZKCounters) *TxTracker); ok { - r0 = rf(remainingResources, highReservedCounters) + if rf, ok := ret.Get(0).(func(state.BatchResources, state.ZKCounters, bool) *TxTracker); ok { + r0 = rf(remainingResources, highReservedCounters, fistL2Block) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*TxTracker) } } - if rf, ok := ret.Get(1).(func(state.BatchResources, state.ZKCounters) error); ok { - r1 = rf(remainingResources, highReservedCounters) + if rf, ok := ret.Get(1).(func(state.BatchResources, state.ZKCounters, bool) []*TxTracker); ok { + r1 = rf(remainingResources, highReservedCounters, fistL2Block) } else { - r1 = ret.Error(1) + if ret.Get(1) != nil { + r1 = ret.Get(1).([]*TxTracker) + } } - return r0, r1 + if rf, ok := ret.Get(2).(func(state.BatchResources, state.ZKCounters, bool) error); ok { + r2 = rf(remainingResources, highReservedCounters, fistL2Block) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } // MoveTxPendingToStore provides a mock function with given fields: txHash, addr diff --git a/sequencer/worker.go b/sequencer/worker.go index c6be5ed5ab..94a5969cd1 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -64,8 +64,8 @@ func (w *Worker) addTxTracker(ctx context.Context, tx *TxTracker, mutex *sync.Mu } // Make sure the transaction's reserved ZKCounters are within the constraints. - if !w.batchConstraints.IsWithinConstraints(tx.ReservedZKCounters) { - log.Errorf("outOfCounters error (node level) for tx %s", tx.Hash.String()) + if err := w.batchConstraints.CheckNodeLevelOOC(tx.ReservedZKCounters); err != nil { + log.Infof("out of counters (node level) when adding tx %s from address %s, error: %v", tx.Hash, tx.From, err) mutexUnlock(mutex) return nil, pool.ErrOutOfCounters } @@ -405,7 +405,7 @@ func (w *Worker) DeleteTxPendingToStore(txHash common.Hash, addr common.Address) } // GetBestFittingTx gets the most efficient tx that fits in the available batch resources -func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters) (*TxTracker, error) { +func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highReservedCounters state.ZKCounters, isFistL2BlockAndEmpty bool) (*TxTracker, []*TxTracker, error) { w.workerMutex.Lock() defer w.workerMutex.Unlock() @@ -417,7 +417,7 @@ func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highR w.reorgedTxs = w.reorgedTxs[1:] if addrQueue, found := w.pool[reorgedTx.FromStr]; found { if addrQueue.readyTx != nil && addrQueue.readyTx.Hash == reorgedTx.Hash { - return reorgedTx, nil + return reorgedTx, nil, nil } else { log.Warnf("reorged tx %s is not the ready tx for addrQueue %s, this shouldn't happen", reorgedTx.Hash, reorgedTx.From) } @@ -427,12 +427,14 @@ func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highR } if w.txSortedList.len() == 0 { - return nil, ErrTransactionsListEmpty + return nil, nil, ErrTransactionsListEmpty } var ( - tx *TxTracker - foundMutex sync.RWMutex + tx *TxTracker + foundMutex sync.RWMutex + oocTxs []*TxTracker + oocTxsMutex sync.Mutex ) nGoRoutines := runtime.NumCPU() @@ -457,7 +459,14 @@ func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highR needed, _ := getNeededZKCounters(highReservedCounters, txCandidate.UsedZKCounters, txCandidate.ReservedZKCounters) fits, _ := bresources.Fits(state.BatchResources{ZKCounters: needed, Bytes: txCandidate.Bytes}) if !fits { - // We don't add this Tx + // If we are looking for a tx for the first empty L2 block in the batch and this tx doesn't fits in the batch, then this tx will never fit in any batch. + // We add the tx to the oocTxs slice. That slice will be returned to set these txs as invalid (and delete them from the worker) from the finalizer code + if isFistL2BlockAndEmpty { + oocTxsMutex.Lock() + oocTxs = append(oocTxs, txCandidate) + oocTxsMutex.Unlock() + } + // We continue looking for a tx that fits in the batch continue } @@ -477,9 +486,15 @@ func (w *Worker) GetBestFittingTx(remainingResources state.BatchResources, highR if foundAt != -1 { log.Debugf("best fitting tx %s found at index %d with gasPrice %d", tx.HashStr, foundAt, tx.GasPrice) w.wipTx = tx - return tx, nil + return tx, oocTxs, nil } else { - return nil, ErrNoFittingTransaction + // If the length of the oocTxs slice is equal to the length of the txSortedList this means that all the txs are ooc, + // therefore we need to return an error indicating that the list is empty + if w.txSortedList.len() == len(oocTxs) { + return nil, oocTxs, ErrTransactionsListEmpty + } else { + return nil, oocTxs, ErrNoFittingTransaction + } } } diff --git a/sequencer/worker_test.go b/sequencer/worker_test.go index 0e2375ad37..3f489cc0be 100644 --- a/sequencer/worker_test.go +++ b/sequencer/worker_test.go @@ -258,7 +258,7 @@ func TestWorkerGetBestTx(t *testing.T) { ct := 0 for { - tx, _ := worker.GetBestFittingTx(rc, state.ZKCounters{}) + tx, _, _ := worker.GetBestFittingTx(rc, state.ZKCounters{}, true) if tx != nil { if ct >= len(expectedGetBestTx) { t.Fatalf("Error getting more best tx than expected. Expected=%d, Actual=%d", len(expectedGetBestTx), ct+1) diff --git a/state/config.go b/state/config.go index 61e74e63d6..35975d1977 100644 --- a/state/config.go +++ b/state/config.go @@ -1,6 +1,8 @@ package state import ( + "fmt" + "github.com/0xPolygonHermez/zkevm-node/config/types" "github.com/0xPolygonHermez/zkevm-node/db" ) @@ -71,15 +73,42 @@ type BatchConstraintsCfg struct { MaxSHA256Hashes uint32 `mapstructure:"MaxSHA256Hashes"` } -// IsWithinConstraints checks if the counters are within the batch constraints -func (c BatchConstraintsCfg) IsWithinConstraints(counters ZKCounters) bool { - return counters.GasUsed <= c.MaxCumulativeGasUsed && - counters.KeccakHashes <= c.MaxKeccakHashes && - counters.PoseidonHashes <= c.MaxPoseidonHashes && - counters.PoseidonPaddings <= c.MaxPoseidonPaddings && - counters.MemAligns <= c.MaxMemAligns && - counters.Arithmetics <= c.MaxArithmetics && - counters.Binaries <= c.MaxBinaries && - counters.Steps <= c.MaxSteps && - counters.Sha256Hashes_V2 <= c.MaxSHA256Hashes +// CheckNodeLevelOOC checks if the counters are within the batch constraints +func (c BatchConstraintsCfg) CheckNodeLevelOOC(counters ZKCounters) error { + oocList := "" + + if counters.GasUsed > c.MaxCumulativeGasUsed { + oocList += "GasUsed, " + } + if counters.KeccakHashes > c.MaxKeccakHashes { + oocList += "KeccakHashes, " + } + if counters.PoseidonHashes > c.MaxPoseidonHashes { + oocList += "PoseidonHashes, " + } + if counters.PoseidonPaddings > c.MaxPoseidonPaddings { + oocList += "PoseidonPaddings, " + } + if counters.MemAligns > c.MaxMemAligns { + oocList += "MemAligns, " + } + if counters.Arithmetics > c.MaxArithmetics { + oocList += "Arithmetics, " + } + if counters.Binaries > c.MaxBinaries { + oocList += "Binaries, " + } + if counters.Steps > c.MaxSteps { + oocList += "Steps, " + } + if counters.Sha256Hashes_V2 > c.MaxSHA256Hashes { + oocList += "Sha256Hashes, " + } + + if oocList != "" { + oocList = oocList[:len(oocList)-2] // Remove last comma and blank space + return fmt.Errorf("out of counters at node level (%s)", oocList) + } + + return nil }