From e6bc4b617930e633f87ee1387ab94ff556510798 Mon Sep 17 00:00:00 2001 From: acud <12988138+acud@users.noreply.github.com> Date: Tue, 30 Jul 2024 02:00:39 -0600 Subject: [PATCH] Backport #6185: optimize mempool (#6186) * chore: improve mempool * Cleanup Changelog --------- Co-authored-by: Matthias <5011972+fasmat@users.noreply.github.com> --- CHANGELOG.md | 6 +++ txs/cache.go | 10 +--- txs/cache_test.go | 95 ++++++++++++++++------------------ txs/conservative_state_test.go | 6 +-- 4 files changed, 56 insertions(+), 61 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e068651d4b..3d9dac9991 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ See [RELEASE](./RELEASE.md) for workflow instructions. +## Release v1.6.5 + +### Improvements + +* [#6185](https://github.com/spacemeshos/go-spacemesh/pull/6185) Optimize mempool + ## Release v1.6.4 ### Improvements diff --git a/txs/cache.go b/txs/cache.go index f15c10d3bb..12fad793e0 100644 --- a/txs/cache.go +++ b/txs/cache.go @@ -97,7 +97,7 @@ func (ac *accountCache) availBalance() uint64 { func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element, *candidate, error) { if ac.txsByNonce.Len() >= maxTXsPerAcct { ac.moreInDB = true - return nil, nil, errTooManyNonce + return nil, nil, fmt.Errorf("%w: len %d", errTooManyNonce, ac.txsByNonce.Len()) } balance := ac.startBalance var prev *list.Element @@ -115,7 +115,6 @@ func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element break } if balance < ntx.MaxSpending() { - ac.moreInDB = true logger.Debug("insufficient balance", zap.Stringer("tx_id", ntx.ID), zap.Stringer("address", ntx.Principal), @@ -593,15 +592,10 @@ func (c *Cache) cleanupAccounts(accounts map[types.Address]struct{}) { } } -// - errInsufficientBalance: -// conservative cache is conservative in that it only counts principal's spending for pending transactions. -// a tx rejected due to insufficient balance MAY become feasible after a layer is applied (principal -// received incoming funds). when we receive a errInsufficientBalance tx, we should store it in db and -// re-evaluate it after each layer is applied. // - errTooManyNonce: when a principal has way too many nonces, we don't want to blow up the memory. they should // be stored in db and retrieved after each earlier nonce is applied. func acceptable(err error) bool { - return err == nil || errors.Is(err, errInsufficientBalance) || errors.Is(err, errTooManyNonce) + return err == nil || errors.Is(err, errTooManyNonce) } func (c *Cache) Add( diff --git a/txs/cache_test.go b/txs/cache_test.go index 96ab805074..6b5c76038d 100644 --- a/txs/cache_test.go +++ b/txs/cache_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "github.com/spacemeshos/go-spacemesh/common/types" @@ -96,6 +97,7 @@ func saveTXs(t *testing.T, db *sql.Database, mtxs []*types.MeshTransaction) { } func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransaction, state types.TXState) { + t.Helper() for _, mtx := range txs { got, err := transactions.Get(db, mtx.ID) require.NoError(t, err) @@ -104,6 +106,7 @@ func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransac } func checkTXNotInDB(t *testing.T, db *sql.Database, tid types.TransactionID) { + t.Helper() _, err := transactions.Get(db, tid) require.ErrorIs(t, err, sql.ErrNotFound) } @@ -185,7 +188,7 @@ func createSingleAccountTestCache(tb testing.TB) (*testCache, *testAcct) { states := map[types.Address]*testAcct{principal: ta} db := sql.InMemory() return &testCache{ - Cache: NewCache(getStateFunc(states), zaptest.NewLogger(tb)), + Cache: NewCache(getStateFunc(states), zap.NewNop()), db: db, }, ta } @@ -707,17 +710,17 @@ func TestCache_Account_Add_RandomOrder(t *testing.T) { func TestCache_Account_Add_InsufficientBalance_ResetAfterApply(t *testing.T) { tc, ta := createSingleAccountTestCache(t) buildSingleAccountCache(t, tc, ta, nil) - mtx := &types.MeshTransaction{ Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer), Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, + mtx.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, mtx.ID) checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) checkMempool(t, tc.Cache, nil) - require.True(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL) + require.False(t, tc.MoreInDB(ta.principal)) + checkTXNotInDB(t, tc.db, mtx.ID) lid := types.LayerID(97) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) @@ -725,11 +728,9 @@ func TestCache_Account_Add_InsufficientBalance_ResetAfterApply(t *testing.T) { ta.balance += ta.balance require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil)) - checkTX(t, tc.Cache, mtx.ID, 0, types.EmptyBlockID) - expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}} - checkMempool(t, tc.Cache, expectedMempool) + checkMempool(t, tc.Cache, nil) require.False(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL) + checkTXNotInDB(t, tc.db, mtx.ID) } func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testing.T) { @@ -744,27 +745,26 @@ func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testi Transaction: *newTx(t, ta.nonce+10, ta.balance, defaultFee, ta.signer), Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction, mtx0.Received, false)) - require.NoError(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction, mtx1.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction, + mtx0.Received, false), errInsufficientBalance) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction, + mtx1.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, mtx0.ID) checkNoTX(t, tc.Cache, mtx1.ID) checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) checkMempool(t, tc.Cache, nil) - require.True(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL) + require.False(t, tc.MoreInDB(ta.principal)) + checkTXNotInDB(t, tc.db, mtx0.ID) + checkTXNotInDB(t, tc.db, mtx1.ID) lid := types.LayerID(97) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) - // the account receive enough funds in layer 97 (via rewards or incoming transfer) for mtx1 - ta.balance = mtx1.Spending() require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil)) checkNoTX(t, tc.Cache, mtx0.ID) - checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID) - checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0) - expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx1}} - checkMempool(t, tc.Cache, expectedMempool) - require.True(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL) + checkNoTX(t, tc.Cache, mtx1.ID) + checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) + checkMempool(t, tc.Cache, nil) + require.False(t, tc.MoreInDB(ta.principal)) lid = lid.Add(1) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) @@ -772,13 +772,11 @@ func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testi // but the account receive enough funds in layer 98 (via rewards or incoming transfer) for both mtx0 and mtx1 ta.balance = mtx0.Spending() + mtx1.Spending() require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{2, 3, 4}, nil, nil)) - checkTX(t, tc.Cache, mtx0.ID, 0, types.EmptyBlockID) - checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID) - checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0) - expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {mtx0, mtx1}} - checkMempool(t, tc.Cache, expectedMempool) + checkNoTX(t, tc.Cache, mtx0.ID) + checkNoTX(t, tc.Cache, mtx1.ID) + checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) + checkMempool(t, tc.Cache, nil) require.False(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL) } func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) { @@ -789,12 +787,13 @@ func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) { Transaction: *newTx(t, ta.nonce, defaultBalance, defaultFee, ta.signer), Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, + mtx.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, mtx.ID) checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) checkMempool(t, tc.Cache, nil) - require.True(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL) + require.False(t, tc.MoreInDB(ta.principal)) + checkTXNotInDB(t, tc.db, mtx.ID) } func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) { @@ -810,12 +809,13 @@ func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) { Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer), Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &spender.Transaction, spender.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &spender.Transaction, + spender.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, spender.ID) checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, ta.balance-mtx.Spending()) expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}} checkMempool(t, tc.Cache, expectedMempool) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx, spender}, types.MEMPOOL) + checkTXNotInDB(t, tc.db, spender.ID) } func TestCache_Account_AppliedTXsNotInCache(t *testing.T) { @@ -889,10 +889,11 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) { largeAmount := defaultBalance for _, p := range pending { p.MaxSpend = largeAmount - require.NoError(t, tc.Add(context.Background(), tc.db, &p.Transaction, p.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &p.Transaction, + p.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, p.ID) + checkTXNotInDB(t, tc.db, p.ID) } - checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL) checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance) expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}} checkMempool(t, tc.Cache, expectedMempool) @@ -902,28 +903,24 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) { // transactions in `pending` feasible now income := defaultBalance * 100 ta.nonce++ - ta.balance = ta.balance - mtx.Spending() + income + ta.balance = ta.balance + income - mtx.Spending() lid := types.LayerID(97) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) bid := types.BlockID{1, 2, 3} applied := makeResults(lid, bid, mtx.Transaction) require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{})) // all pending txs are added to cache now - newNextNonce = ta.nonce + uint64(len(pending)) + newNextNonce = ta.nonce newBalance = ta.balance - for _, p := range pending { - newBalance -= p.Spending() - } checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance) - expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: pending} - checkMempool(t, tc.Cache, expectedMempool) + checkMempool(t, tc.Cache, nil) checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.APPLIED) - checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL) } func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) { tc, ta := createSingleAccountTestCache(t) mtxs := genAndSaveTXs(t, tc.db, ta.signer, ta.nonce, ta.nonce+4, time.Now()) + newNextNonce, newBalance := buildSingleAccountCache(t, tc, ta, mtxs) higherFee := defaultFee + 1 @@ -933,29 +930,27 @@ func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) { Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &better.Transaction, better.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &better.Transaction, + better.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, better.ID) checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance) expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: mtxs} checkMempool(t, tc.Cache, expectedMempool) - checkTXStateFromDB(t, tc.db, append(mtxs, better), types.MEMPOOL) + checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL) + checkTXNotInDB(t, tc.db, better.ID) // apply lid // there is also an incoming fund of `income` to the principal's account // the income is just enough to allow `better` to be feasible - income := mtxs[0].Spending() - ta.nonce++ - ta.balance = ta.balance - mtxs[0].Spending() + income lid := types.LayerID(97) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) bid := types.BlockID{1, 2, 3} applied := makeResults(lid, bid, mtxs[0].Transaction) require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{})) - checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, 0) - expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {better}} + expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: mtxs[1:]} checkMempool(t, tc.Cache, expectedMempool) checkTXStateFromDB(t, tc.db, mtxs[:1], types.APPLIED) - checkTXStateFromDB(t, tc.db, append(mtxs[1:], better), types.MEMPOOL) + checkTXStateFromDB(t, tc.db, mtxs[1:], types.MEMPOOL) } func TestCache_Account_EvictedAfterApply(t *testing.T) { diff --git a/txs/conservative_state_test.go b/txs/conservative_state_test.go index 46bad3a5a2..8d4b5369e4 100644 --- a/txs/conservative_state_test.go +++ b/txs/conservative_state_test.go @@ -407,10 +407,10 @@ func TestAddToCache_InsufficientBalance(t *testing.T) { tcs.mvm.EXPECT().GetBalance(addr).Return(defaultAmount, nil).Times(1) tcs.mvm.EXPECT().GetNonce(addr).Return(nonce, nil).Times(1) tx := newTx(t, nonce, defaultAmount, defaultFee, signer) - require.NoError(t, tcs.AddToCache(context.Background(), tx, time.Now())) + require.ErrorIs(t, tcs.AddToCache(context.Background(), tx, time.Now()), errInsufficientBalance) checkNoTX(t, tcs.cache, tx.ID) - require.True(t, tcs.cache.MoreInDB(addr)) - checkTXStateFromDB(t, tcs.db, []*types.MeshTransaction{{Transaction: *tx}}, types.MEMPOOL) + require.False(t, tcs.cache.MoreInDB(addr)) + checkTXNotInDB(t, tcs.db, tx.ID) } func TestAddToCache_TooManyForOneAccount(t *testing.T) {