From 53610a32354fadf87fca13c61046d23b6f0b62f2 Mon Sep 17 00:00:00 2001 From: acud <12988138+acud@users.noreply.github.com> Date: Mon, 29 Jul 2024 22:36:34 +0000 Subject: [PATCH] chore(txs): mempool optimization (#6185) ## Motivation Improves some aspects of the mempool handling. --- CHANGELOG.md | 2 + txs/cache.go | 10 +--- txs/cache_test.go | 95 ++++++++++++++++------------------ txs/conservative_state_test.go | 6 +-- 4 files changed, 52 insertions(+), 61 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f40e9186c2..b1c1514415 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ See [RELEASE](./RELEASE.md) for workflow instructions. ### Improvements +* [#6185](https://github.com/spacemeshos/go-spacemesh/pull/6185) Optimize mempool + ## Release v1.6.4 ### Improvements diff --git a/txs/cache.go b/txs/cache.go index 44cef648b8..03106286ea 100644 --- a/txs/cache.go +++ b/txs/cache.go @@ -97,7 +97,7 @@ func (ac *accountCache) availBalance() uint64 { func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element, *candidate, error) { if ac.txsByNonce.Len() >= maxTXsPerAcct { ac.moreInDB = true - return nil, nil, errTooManyNonce + return nil, nil, fmt.Errorf("%w: len %d", errTooManyNonce, ac.txsByNonce.Len()) } balance := ac.startBalance var prev *list.Element @@ -115,7 +115,6 @@ func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element break } if balance < ntx.MaxSpending() { - ac.moreInDB = true return nil, nil, errInsufficientBalance } return prev, &candidate{best: ntx, postBalance: balance - ntx.MaxSpending()}, nil @@ -553,15 +552,10 @@ func (c *Cache) cleanupAccounts(accounts ...types.Address) { } } -// - errInsufficientBalance: -// conservative cache is conservative in that it only counts principal's spending for pending transactions. -// a tx rejected due to insufficient balance MAY become feasible after a layer is applied (principal -// received incoming funds). when we receive a errInsufficientBalance tx, we should store it in db and -// re-evaluate it after each layer is applied. // - errTooManyNonce: when a principal has way too many nonces, we don't want to blow up the memory. they should // be stored in db and retrieved after each earlier nonce is applied. func acceptable(err error) bool { - return err == nil || errors.Is(err, errInsufficientBalance) || errors.Is(err, errTooManyNonce) + return err == nil || errors.Is(err, errTooManyNonce) } func (c *Cache) Add( diff --git a/txs/cache_test.go b/txs/cache_test.go index 64901399d4..352ba5d466 100644 --- a/txs/cache_test.go +++ b/txs/cache_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "github.com/spacemeshos/go-spacemesh/common/types" @@ -96,6 +97,7 @@ func saveTXs(t *testing.T, db *sql.Database, mtxs []*types.MeshTransaction) { } func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransaction, state types.TXState) { + t.Helper() for _, mtx := range txs { got, err := transactions.Get(db, mtx.ID) require.NoError(t, err) @@ -104,6 +106,7 @@ func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransac } func checkTXNotInDB(t *testing.T, db *sql.Database, tid types.TransactionID) { + t.Helper() _, err := transactions.Get(db, tid) require.ErrorIs(t, err, sql.ErrNotFound) } @@ -185,7 +188,7 @@ func createSingleAccountTestCache(tb testing.TB) (*testCache, *testAcct) { states := map[types.Address]*testAcct{principal: ta} db := sql.InMemory() return &testCache{ - Cache: NewCache(getStateFunc(states), zaptest.NewLogger(tb)), + Cache: NewCache(getStateFunc(states), zap.NewNop()), db: db, }, ta } @@ -733,17 +736,17 @@ func TestCache_Account_ReplaceByFee(t *testing.T) { func TestCache_Account_Add_InsufficientBalance_ResetAfterApply(t *testing.T) { tc, ta := createSingleAccountTestCache(t) buildSingleAccountCache(t, tc, ta, nil) - mtx := &types.MeshTransaction{ Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer), Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, + mtx.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, mtx.ID) checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) checkMempool(t, tc.Cache, nil) - require.True(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL) + require.False(t, tc.MoreInDB(ta.principal)) + checkTXNotInDB(t, tc.db, mtx.ID) lid := types.LayerID(97) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) @@ -751,11 +754,9 @@ func TestCache_Account_Add_InsufficientBalance_ResetAfterApply(t *testing.T) { ta.balance += ta.balance require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil)) - checkTX(t, tc.Cache, mtx.ID, 0, types.EmptyBlockID) - expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}} - checkMempool(t, tc.Cache, expectedMempool) + checkMempool(t, tc.Cache, nil) require.False(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL) + checkTXNotInDB(t, tc.db, mtx.ID) } func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testing.T) { @@ -770,27 +771,26 @@ func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testi Transaction: *newTx(t, ta.nonce+10, ta.balance, defaultFee, ta.signer), Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction, mtx0.Received, false)) - require.NoError(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction, mtx1.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction, + mtx0.Received, false), errInsufficientBalance) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction, + mtx1.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, mtx0.ID) checkNoTX(t, tc.Cache, mtx1.ID) checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) checkMempool(t, tc.Cache, nil) - require.True(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL) + require.False(t, tc.MoreInDB(ta.principal)) + checkTXNotInDB(t, tc.db, mtx0.ID) + checkTXNotInDB(t, tc.db, mtx1.ID) lid := types.LayerID(97) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) - // the account receive enough funds in layer 97 (via rewards or incoming transfer) for mtx1 - ta.balance = mtx1.Spending() require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil)) checkNoTX(t, tc.Cache, mtx0.ID) - checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID) - checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0) - expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx1}} - checkMempool(t, tc.Cache, expectedMempool) - require.True(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL) + checkNoTX(t, tc.Cache, mtx1.ID) + checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) + checkMempool(t, tc.Cache, nil) + require.False(t, tc.MoreInDB(ta.principal)) lid = lid.Add(1) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) @@ -798,13 +798,11 @@ func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testi // but the account receive enough funds in layer 98 (via rewards or incoming transfer) for both mtx0 and mtx1 ta.balance = mtx0.Spending() + mtx1.Spending() require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{2, 3, 4}, nil, nil)) - checkTX(t, tc.Cache, mtx0.ID, 0, types.EmptyBlockID) - checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID) - checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0) - expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {mtx0, mtx1}} - checkMempool(t, tc.Cache, expectedMempool) + checkNoTX(t, tc.Cache, mtx0.ID) + checkNoTX(t, tc.Cache, mtx1.ID) + checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) + checkMempool(t, tc.Cache, nil) require.False(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL) } func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) { @@ -815,12 +813,13 @@ func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) { Transaction: *newTx(t, ta.nonce, defaultBalance, defaultFee, ta.signer), Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, + mtx.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, mtx.ID) checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance) checkMempool(t, tc.Cache, nil) - require.True(t, tc.MoreInDB(ta.principal)) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL) + require.False(t, tc.MoreInDB(ta.principal)) + checkTXNotInDB(t, tc.db, mtx.ID) } func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) { @@ -836,12 +835,13 @@ func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) { Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer), Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &spender.Transaction, spender.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &spender.Transaction, + spender.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, spender.ID) checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, ta.balance-mtx.Spending()) expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}} checkMempool(t, tc.Cache, expectedMempool) - checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx, spender}, types.MEMPOOL) + checkTXNotInDB(t, tc.db, spender.ID) } func TestCache_Account_AppliedTXsNotInCache(t *testing.T) { @@ -915,10 +915,11 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) { largeAmount := defaultBalance for _, p := range pending { p.MaxSpend = largeAmount - require.NoError(t, tc.Add(context.Background(), tc.db, &p.Transaction, p.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &p.Transaction, + p.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, p.ID) + checkTXNotInDB(t, tc.db, p.ID) } - checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL) checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance) expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}} checkMempool(t, tc.Cache, expectedMempool) @@ -928,28 +929,24 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) { // transactions in `pending` feasible now income := defaultBalance * 100 ta.nonce++ - ta.balance = ta.balance - mtx.Spending() + income + ta.balance = ta.balance + income - mtx.Spending() lid := types.LayerID(97) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) bid := types.BlockID{1, 2, 3} applied := makeResults(lid, bid, mtx.Transaction) require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{})) // all pending txs are added to cache now - newNextNonce = ta.nonce + uint64(len(pending)) + newNextNonce = ta.nonce newBalance = ta.balance - for _, p := range pending { - newBalance -= p.Spending() - } checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance) - expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: pending} - checkMempool(t, tc.Cache, expectedMempool) + checkMempool(t, tc.Cache, nil) checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.APPLIED) - checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL) } func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) { tc, ta := createSingleAccountTestCache(t) mtxs := genAndSaveTXs(t, tc.db, ta.signer, ta.nonce, ta.nonce+4, time.Now()) + newNextNonce, newBalance := buildSingleAccountCache(t, tc, ta, mtxs) higherFee := defaultFee + 1 @@ -959,29 +956,27 @@ func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) { Received: time.Now(), } - require.NoError(t, tc.Add(context.Background(), tc.db, &better.Transaction, better.Received, false)) + require.ErrorIs(t, tc.Add(context.Background(), tc.db, &better.Transaction, + better.Received, false), errInsufficientBalance) checkNoTX(t, tc.Cache, better.ID) checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance) expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: mtxs} checkMempool(t, tc.Cache, expectedMempool) - checkTXStateFromDB(t, tc.db, append(mtxs, better), types.MEMPOOL) + checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL) + checkTXNotInDB(t, tc.db, better.ID) // apply lid // there is also an incoming fund of `income` to the principal's account // the income is just enough to allow `better` to be feasible - income := mtxs[0].Spending() - ta.nonce++ - ta.balance = ta.balance - mtxs[0].Spending() + income lid := types.LayerID(97) require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID())) bid := types.BlockID{1, 2, 3} applied := makeResults(lid, bid, mtxs[0].Transaction) require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{})) - checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, 0) - expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {better}} + expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: mtxs[1:]} checkMempool(t, tc.Cache, expectedMempool) checkTXStateFromDB(t, tc.db, mtxs[:1], types.APPLIED) - checkTXStateFromDB(t, tc.db, append(mtxs[1:], better), types.MEMPOOL) + checkTXStateFromDB(t, tc.db, mtxs[1:], types.MEMPOOL) } func TestCache_Account_EvictedAfterApply(t *testing.T) { diff --git a/txs/conservative_state_test.go b/txs/conservative_state_test.go index 90ae5e7e0f..c2e78592e8 100644 --- a/txs/conservative_state_test.go +++ b/txs/conservative_state_test.go @@ -402,10 +402,10 @@ func TestAddToCache_InsufficientBalance(t *testing.T) { tcs.mvm.EXPECT().GetBalance(addr).Return(defaultAmount, nil).Times(1) tcs.mvm.EXPECT().GetNonce(addr).Return(nonce, nil).Times(1) tx := newTx(t, nonce, defaultAmount, defaultFee, signer) - require.NoError(t, tcs.AddToCache(context.Background(), tx, time.Now())) + require.ErrorIs(t, tcs.AddToCache(context.Background(), tx, time.Now()), errInsufficientBalance) checkNoTX(t, tcs.cache, tx.ID) - require.True(t, tcs.cache.MoreInDB(addr)) - checkTXStateFromDB(t, tcs.db, []*types.MeshTransaction{{Transaction: *tx}}, types.MEMPOOL) + require.False(t, tcs.cache.MoreInDB(addr)) + checkTXNotInDB(t, tcs.db, tx.ID) } func TestAddToCache_TooManyForOneAccount(t *testing.T) {