Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #6185: optimize mempool #6186

Merged
merged 2 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@

See [RELEASE](./RELEASE.md) for workflow instructions.

## UNRELEASED

### Upgrade information

### Highlights

### Features

### Improvements

## Release v1.6.5

### Improvements

* [#6185](https://github.com/spacemeshos/go-spacemesh/pull/6185) Optimize mempool

## Release v1.6.4

### Improvements
Expand Down
10 changes: 2 additions & 8 deletions txs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (ac *accountCache) availBalance() uint64 {
func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element, *candidate, error) {
if ac.txsByNonce.Len() >= maxTXsPerAcct {
ac.moreInDB = true
return nil, nil, errTooManyNonce
return nil, nil, fmt.Errorf("%w: len %d", errTooManyNonce, ac.txsByNonce.Len())
}
balance := ac.startBalance
var prev *list.Element
Expand All @@ -115,7 +115,6 @@ func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element
break
}
if balance < ntx.MaxSpending() {
ac.moreInDB = true
logger.Debug("insufficient balance",
zap.Stringer("tx_id", ntx.ID),
zap.Stringer("address", ntx.Principal),
Expand Down Expand Up @@ -593,15 +592,10 @@ func (c *Cache) cleanupAccounts(accounts map[types.Address]struct{}) {
}
}

// - errInsufficientBalance:
// conservative cache is conservative in that it only counts principal's spending for pending transactions.
// a tx rejected due to insufficient balance MAY become feasible after a layer is applied (principal
// received incoming funds). when we receive a errInsufficientBalance tx, we should store it in db and
// re-evaluate it after each layer is applied.
// - errTooManyNonce: when a principal has way too many nonces, we don't want to blow up the memory. they should
// be stored in db and retrieved after each earlier nonce is applied.
func acceptable(err error) bool {
return err == nil || errors.Is(err, errInsufficientBalance) || errors.Is(err, errTooManyNonce)
return err == nil || errors.Is(err, errTooManyNonce)
}

func (c *Cache) Add(
Expand Down
95 changes: 45 additions & 50 deletions txs/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -96,6 +97,7 @@ func saveTXs(t *testing.T, db *sql.Database, mtxs []*types.MeshTransaction) {
}

func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransaction, state types.TXState) {
t.Helper()
for _, mtx := range txs {
got, err := transactions.Get(db, mtx.ID)
require.NoError(t, err)
Expand All @@ -104,6 +106,7 @@ func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransac
}

func checkTXNotInDB(t *testing.T, db *sql.Database, tid types.TransactionID) {
t.Helper()
_, err := transactions.Get(db, tid)
require.ErrorIs(t, err, sql.ErrNotFound)
}
Expand Down Expand Up @@ -185,7 +188,7 @@ func createSingleAccountTestCache(tb testing.TB) (*testCache, *testAcct) {
states := map[types.Address]*testAcct{principal: ta}
db := sql.InMemory()
return &testCache{
Cache: NewCache(getStateFunc(states), zaptest.NewLogger(tb)),
Cache: NewCache(getStateFunc(states), zap.NewNop()),
db: db,
}, ta
}
Expand Down Expand Up @@ -707,29 +710,27 @@ func TestCache_Account_Add_RandomOrder(t *testing.T) {
func TestCache_Account_Add_InsufficientBalance_ResetAfterApply(t *testing.T) {
tc, ta := createSingleAccountTestCache(t)
buildSingleAccountCache(t, tc, ta, nil)

mtx := &types.MeshTransaction{
Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction,
mtx.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, mtx.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL)
require.False(t, tc.MoreInDB(ta.principal))
checkTXNotInDB(t, tc.db, mtx.ID)

lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
// the account will receive funds in layer 97 (via rewards or incoming transfer)
ta.balance += ta.balance
require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil))

checkTX(t, tc.Cache, mtx.ID, 0, types.EmptyBlockID)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}}
checkMempool(t, tc.Cache, expectedMempool)
checkMempool(t, tc.Cache, nil)
require.False(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL)
checkTXNotInDB(t, tc.db, mtx.ID)
}

func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testing.T) {
Expand All @@ -744,41 +745,38 @@ func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testi
Transaction: *newTx(t, ta.nonce+10, ta.balance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction, mtx0.Received, false))
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction, mtx1.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction,
mtx0.Received, false), errInsufficientBalance)
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction,
mtx1.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, mtx0.ID)
checkNoTX(t, tc.Cache, mtx1.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL)
require.False(t, tc.MoreInDB(ta.principal))
checkTXNotInDB(t, tc.db, mtx0.ID)
checkTXNotInDB(t, tc.db, mtx1.ID)

lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
// the account receive enough funds in layer 97 (via rewards or incoming transfer) for mtx1
ta.balance = mtx1.Spending()
require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil))
checkNoTX(t, tc.Cache, mtx0.ID)
checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID)
checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx1}}
checkMempool(t, tc.Cache, expectedMempool)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL)
checkNoTX(t, tc.Cache, mtx1.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.False(t, tc.MoreInDB(ta.principal))

lid = lid.Add(1)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
// for some reasons this account wasn't applied in layer 98.
// but the account receive enough funds in layer 98 (via rewards or incoming transfer) for both mtx0 and mtx1
ta.balance = mtx0.Spending() + mtx1.Spending()
require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{2, 3, 4}, nil, nil))
checkTX(t, tc.Cache, mtx0.ID, 0, types.EmptyBlockID)
checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID)
checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0)
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {mtx0, mtx1}}
checkMempool(t, tc.Cache, expectedMempool)
checkNoTX(t, tc.Cache, mtx0.ID)
checkNoTX(t, tc.Cache, mtx1.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.False(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL)
}

func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) {
Expand All @@ -789,12 +787,13 @@ func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) {
Transaction: *newTx(t, ta.nonce, defaultBalance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction,
mtx.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, mtx.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL)
require.False(t, tc.MoreInDB(ta.principal))
checkTXNotInDB(t, tc.db, mtx.ID)
}

func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) {
Expand All @@ -810,12 +809,13 @@ func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) {
Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &spender.Transaction, spender.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &spender.Transaction,
spender.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, spender.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, ta.balance-mtx.Spending())
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}}
checkMempool(t, tc.Cache, expectedMempool)
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx, spender}, types.MEMPOOL)
checkTXNotInDB(t, tc.db, spender.ID)
}

func TestCache_Account_AppliedTXsNotInCache(t *testing.T) {
Expand Down Expand Up @@ -889,10 +889,11 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) {
largeAmount := defaultBalance
for _, p := range pending {
p.MaxSpend = largeAmount
require.NoError(t, tc.Add(context.Background(), tc.db, &p.Transaction, p.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &p.Transaction,
p.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, p.ID)
checkTXNotInDB(t, tc.db, p.ID)
}
checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL)
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}}
checkMempool(t, tc.Cache, expectedMempool)
Expand All @@ -902,28 +903,24 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) {
// transactions in `pending` feasible now
income := defaultBalance * 100
ta.nonce++
ta.balance = ta.balance - mtx.Spending() + income
ta.balance = ta.balance + income - mtx.Spending()
lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
bid := types.BlockID{1, 2, 3}
applied := makeResults(lid, bid, mtx.Transaction)
require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{}))
// all pending txs are added to cache now
newNextNonce = ta.nonce + uint64(len(pending))
newNextNonce = ta.nonce
newBalance = ta.balance
for _, p := range pending {
newBalance -= p.Spending()
}
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: pending}
checkMempool(t, tc.Cache, expectedMempool)
checkMempool(t, tc.Cache, nil)
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.APPLIED)
checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL)
}

func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) {
tc, ta := createSingleAccountTestCache(t)
mtxs := genAndSaveTXs(t, tc.db, ta.signer, ta.nonce, ta.nonce+4, time.Now())

newNextNonce, newBalance := buildSingleAccountCache(t, tc, ta, mtxs)

higherFee := defaultFee + 1
Expand All @@ -933,29 +930,27 @@ func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) {
Received: time.Now(),
}

require.NoError(t, tc.Add(context.Background(), tc.db, &better.Transaction, better.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &better.Transaction,
better.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, better.ID)
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: mtxs}
checkMempool(t, tc.Cache, expectedMempool)
checkTXStateFromDB(t, tc.db, append(mtxs, better), types.MEMPOOL)
checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL)
checkTXNotInDB(t, tc.db, better.ID)

// apply lid
// there is also an incoming fund of `income` to the principal's account
// the income is just enough to allow `better` to be feasible
income := mtxs[0].Spending()
ta.nonce++
ta.balance = ta.balance - mtxs[0].Spending() + income
lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
bid := types.BlockID{1, 2, 3}
applied := makeResults(lid, bid, mtxs[0].Transaction)
require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{}))
checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, 0)
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {better}}
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: mtxs[1:]}
checkMempool(t, tc.Cache, expectedMempool)
checkTXStateFromDB(t, tc.db, mtxs[:1], types.APPLIED)
checkTXStateFromDB(t, tc.db, append(mtxs[1:], better), types.MEMPOOL)
checkTXStateFromDB(t, tc.db, mtxs[1:], types.MEMPOOL)
}

func TestCache_Account_EvictedAfterApply(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions txs/conservative_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,10 @@ func TestAddToCache_InsufficientBalance(t *testing.T) {
tcs.mvm.EXPECT().GetBalance(addr).Return(defaultAmount, nil).Times(1)
tcs.mvm.EXPECT().GetNonce(addr).Return(nonce, nil).Times(1)
tx := newTx(t, nonce, defaultAmount, defaultFee, signer)
require.NoError(t, tcs.AddToCache(context.Background(), tx, time.Now()))
require.ErrorIs(t, tcs.AddToCache(context.Background(), tx, time.Now()), errInsufficientBalance)
checkNoTX(t, tcs.cache, tx.ID)
require.True(t, tcs.cache.MoreInDB(addr))
checkTXStateFromDB(t, tcs.db, []*types.MeshTransaction{{Transaction: *tx}}, types.MEMPOOL)
require.False(t, tcs.cache.MoreInDB(addr))
checkTXNotInDB(t, tcs.db, tx.ID)
}

func TestAddToCache_TooManyForOneAccount(t *testing.T) {
Expand Down
Loading