Skip to content

Commit

Permalink
chore(txs): mempool optimization (#6185)
Browse files Browse the repository at this point in the history
## Motivation

Improves some aspects of the mempool handling.
  • Loading branch information
acud committed Jul 29, 2024
1 parent d176dc8 commit 8fcf9a2
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 61 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ See [RELEASE](./RELEASE.md) for workflow instructions.

### Improvements

* [#6185](https://github.com/spacemeshos/go-spacemesh/pull/6185) Optimize mempool

## Release v1.6.4

### Improvements
Expand Down
12 changes: 4 additions & 8 deletions txs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ func (ac *accountCache) availBalance() uint64 {

func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element, *candidate, error) {
if ac.txsByNonce.Len() >= maxTXsPerAcct {

ac.moreInDB = true
return nil, nil, errTooManyNonce
return nil, nil, fmt.Errorf("%w: len %d", errTooManyNonce, ac.txsByNonce.Len())
}
balance := ac.startBalance
var prev *list.Element
Expand All @@ -115,7 +116,6 @@ func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element
break
}
if balance < ntx.MaxSpending() {
ac.moreInDB = true
return nil, nil, errInsufficientBalance
}
return prev, &candidate{best: ntx, postBalance: balance - ntx.MaxSpending()}, nil
Expand Down Expand Up @@ -553,15 +553,10 @@ func (c *Cache) cleanupAccounts(accounts ...types.Address) {
}
}

// - errInsufficientBalance:
// conservative cache is conservative in that it only counts principal's spending for pending transactions.
// a tx rejected due to insufficient balance MAY become feasible after a layer is applied (principal
// received incoming funds). when we receive a errInsufficientBalance tx, we should store it in db and
// re-evaluate it after each layer is applied.
// - errTooManyNonce: when a principal has way too many nonces, we don't want to blow up the memory. they should
// be stored in db and retrieved after each earlier nonce is applied.
func acceptable(err error) bool {
return err == nil || errors.Is(err, errInsufficientBalance) || errors.Is(err, errTooManyNonce)
return err == nil || errors.Is(err, errTooManyNonce)
}

func (c *Cache) Add(
Expand All @@ -586,6 +581,7 @@ func (c *Cache) Add(
mempoolTxCount.WithLabelValues(accepted).Inc()
}
if err == nil || mustPersist {
// panic(fmt.Sprintf("err %s, mp %t", err, mustPersist))
if dbErr := transactions.Add(db, tx, received); dbErr != nil {
return dbErr
}
Expand Down
95 changes: 45 additions & 50 deletions txs/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -96,6 +97,7 @@ func saveTXs(t *testing.T, db *sql.Database, mtxs []*types.MeshTransaction) {
}

func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransaction, state types.TXState) {
t.Helper()
for _, mtx := range txs {
got, err := transactions.Get(db, mtx.ID)
require.NoError(t, err)
Expand All @@ -104,6 +106,7 @@ func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransac
}

func checkTXNotInDB(t *testing.T, db *sql.Database, tid types.TransactionID) {
t.Helper()
_, err := transactions.Get(db, tid)
require.ErrorIs(t, err, sql.ErrNotFound)
}
Expand Down Expand Up @@ -185,7 +188,7 @@ func createSingleAccountTestCache(tb testing.TB) (*testCache, *testAcct) {
states := map[types.Address]*testAcct{principal: ta}
db := sql.InMemory()
return &testCache{
Cache: NewCache(getStateFunc(states), zaptest.NewLogger(tb)),
Cache: NewCache(getStateFunc(states), zap.NewNop()),
db: db,
}, ta
}
Expand Down Expand Up @@ -733,29 +736,27 @@ func TestCache_Account_ReplaceByFee(t *testing.T) {
func TestCache_Account_Add_InsufficientBalance_ResetAfterApply(t *testing.T) {
tc, ta := createSingleAccountTestCache(t)
buildSingleAccountCache(t, tc, ta, nil)

mtx := &types.MeshTransaction{
Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction,
mtx.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, mtx.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL)
require.False(t, tc.MoreInDB(ta.principal))
checkTXNotInDB(t, tc.db, mtx.ID)

lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
// the account will receive funds in layer 97 (via rewards or incoming transfer)
ta.balance += ta.balance
require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil))

checkTX(t, tc.Cache, mtx.ID, 0, types.EmptyBlockID)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}}
checkMempool(t, tc.Cache, expectedMempool)
checkMempool(t, tc.Cache, nil)
require.False(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL)
checkTXNotInDB(t, tc.db, mtx.ID)
}

func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testing.T) {
Expand All @@ -770,41 +771,38 @@ func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testi
Transaction: *newTx(t, ta.nonce+10, ta.balance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction, mtx0.Received, false))
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction, mtx1.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction,
mtx0.Received, false), errInsufficientBalance)
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction,
mtx1.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, mtx0.ID)
checkNoTX(t, tc.Cache, mtx1.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL)
require.False(t, tc.MoreInDB(ta.principal))
checkTXNotInDB(t, tc.db, mtx0.ID)
checkTXNotInDB(t, tc.db, mtx1.ID)

lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
// the account receive enough funds in layer 97 (via rewards or incoming transfer) for mtx1
ta.balance = mtx1.Spending()
require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil))
checkNoTX(t, tc.Cache, mtx0.ID)
checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID)
checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx1}}
checkMempool(t, tc.Cache, expectedMempool)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL)
checkNoTX(t, tc.Cache, mtx1.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.False(t, tc.MoreInDB(ta.principal))

lid = lid.Add(1)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
// for some reasons this account wasn't applied in layer 98.
// but the account receive enough funds in layer 98 (via rewards or incoming transfer) for both mtx0 and mtx1
ta.balance = mtx0.Spending() + mtx1.Spending()
require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{2, 3, 4}, nil, nil))
checkTX(t, tc.Cache, mtx0.ID, 0, types.EmptyBlockID)
checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID)
checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0)
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {mtx0, mtx1}}
checkMempool(t, tc.Cache, expectedMempool)
checkNoTX(t, tc.Cache, mtx0.ID)
checkNoTX(t, tc.Cache, mtx1.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.False(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL)
}

func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) {
Expand All @@ -815,12 +813,13 @@ func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) {
Transaction: *newTx(t, ta.nonce, defaultBalance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction,
mtx.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, mtx.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL)
require.False(t, tc.MoreInDB(ta.principal))
checkTXNotInDB(t, tc.db, mtx.ID)
}

func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) {
Expand All @@ -836,12 +835,13 @@ func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) {
Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &spender.Transaction, spender.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &spender.Transaction,
spender.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, spender.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, ta.balance-mtx.Spending())
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}}
checkMempool(t, tc.Cache, expectedMempool)
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx, spender}, types.MEMPOOL)
checkTXNotInDB(t, tc.db, spender.ID)
}

func TestCache_Account_AppliedTXsNotInCache(t *testing.T) {
Expand Down Expand Up @@ -915,10 +915,11 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) {
largeAmount := defaultBalance
for _, p := range pending {
p.MaxSpend = largeAmount
require.NoError(t, tc.Add(context.Background(), tc.db, &p.Transaction, p.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &p.Transaction,
p.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, p.ID)
checkTXNotInDB(t, tc.db, p.ID)
}
checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL)
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}}
checkMempool(t, tc.Cache, expectedMempool)
Expand All @@ -928,28 +929,24 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) {
// transactions in `pending` feasible now
income := defaultBalance * 100
ta.nonce++
ta.balance = ta.balance - mtx.Spending() + income
ta.balance = ta.balance + income - mtx.Spending()
lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
bid := types.BlockID{1, 2, 3}
applied := makeResults(lid, bid, mtx.Transaction)
require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{}))
// all pending txs are added to cache now
newNextNonce = ta.nonce + uint64(len(pending))
newNextNonce = ta.nonce
newBalance = ta.balance
for _, p := range pending {
newBalance -= p.Spending()
}
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: pending}
checkMempool(t, tc.Cache, expectedMempool)
checkMempool(t, tc.Cache, nil)
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.APPLIED)
checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL)
}

func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) {
tc, ta := createSingleAccountTestCache(t)
mtxs := genAndSaveTXs(t, tc.db, ta.signer, ta.nonce, ta.nonce+4, time.Now())

newNextNonce, newBalance := buildSingleAccountCache(t, tc, ta, mtxs)

higherFee := defaultFee + 1
Expand All @@ -959,29 +956,27 @@ func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) {
Received: time.Now(),
}

require.NoError(t, tc.Add(context.Background(), tc.db, &better.Transaction, better.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &better.Transaction,
better.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, better.ID)
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: mtxs}
checkMempool(t, tc.Cache, expectedMempool)
checkTXStateFromDB(t, tc.db, append(mtxs, better), types.MEMPOOL)
checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL)
checkTXNotInDB(t, tc.db, better.ID)

// apply lid
// there is also an incoming fund of `income` to the principal's account
// the income is just enough to allow `better` to be feasible
income := mtxs[0].Spending()
ta.nonce++
ta.balance = ta.balance - mtxs[0].Spending() + income
lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
bid := types.BlockID{1, 2, 3}
applied := makeResults(lid, bid, mtxs[0].Transaction)
require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{}))
checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, 0)
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {better}}
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: mtxs[1:]}
checkMempool(t, tc.Cache, expectedMempool)
checkTXStateFromDB(t, tc.db, mtxs[:1], types.APPLIED)
checkTXStateFromDB(t, tc.db, append(mtxs[1:], better), types.MEMPOOL)
checkTXStateFromDB(t, tc.db, mtxs[1:], types.MEMPOOL)
}

func TestCache_Account_EvictedAfterApply(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions txs/conservative_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ func TestAddToCache_InsufficientBalance(t *testing.T) {
tcs.mvm.EXPECT().GetBalance(addr).Return(defaultAmount, nil).Times(1)
tcs.mvm.EXPECT().GetNonce(addr).Return(nonce, nil).Times(1)
tx := newTx(t, nonce, defaultAmount, defaultFee, signer)
require.NoError(t, tcs.AddToCache(context.Background(), tx, time.Now()))
require.ErrorIs(t, tcs.AddToCache(context.Background(), tx, time.Now()), errInsufficientBalance)
checkNoTX(t, tcs.cache, tx.ID)
require.True(t, tcs.cache.MoreInDB(addr))
checkTXStateFromDB(t, tcs.db, []*types.MeshTransaction{{Transaction: *tx}}, types.MEMPOOL)
require.False(t, tcs.cache.MoreInDB(addr))
checkTXNotInDB(t, tcs.db, tx.ID)
}

func TestAddToCache_TooManyForOneAccount(t *testing.T) {
Expand Down

0 comments on commit 8fcf9a2

Please sign in to comment.