Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - chore(txs): mempool optimization #6185

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ See [RELEASE](./RELEASE.md) for workflow instructions.

### Improvements

* [#6185](https://github.com/spacemeshos/go-spacemesh/pull/6185) Optimize mempool

## Release v1.6.4

### Improvements
Expand Down
10 changes: 2 additions & 8 deletions txs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (ac *accountCache) availBalance() uint64 {
func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element, *candidate, error) {
if ac.txsByNonce.Len() >= maxTXsPerAcct {
ac.moreInDB = true
return nil, nil, errTooManyNonce
return nil, nil, fmt.Errorf("%w: len %d", errTooManyNonce, ac.txsByNonce.Len())
}
balance := ac.startBalance
var prev *list.Element
Expand All @@ -115,7 +115,6 @@ func (ac *accountCache) precheck(logger *zap.Logger, ntx *NanoTX) (*list.Element
break
}
if balance < ntx.MaxSpending() {
ac.moreInDB = true
fasmat marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil, errInsufficientBalance
}
return prev, &candidate{best: ntx, postBalance: balance - ntx.MaxSpending()}, nil
Expand Down Expand Up @@ -553,15 +552,10 @@ func (c *Cache) cleanupAccounts(accounts ...types.Address) {
}
}

// - errInsufficientBalance:
// conservative cache is conservative in that it only counts principal's spending for pending transactions.
// a tx rejected due to insufficient balance MAY become feasible after a layer is applied (principal
// received incoming funds). when we receive a errInsufficientBalance tx, we should store it in db and
// re-evaluate it after each layer is applied.
// - errTooManyNonce: when a principal has way too many nonces, we don't want to blow up the memory. they should
// be stored in db and retrieved after each earlier nonce is applied.
func acceptable(err error) bool {
return err == nil || errors.Is(err, errInsufficientBalance) || errors.Is(err, errTooManyNonce)
return err == nil || errors.Is(err, errTooManyNonce)
}
Comment on lines 557 to 559
Copy link
Member

@fasmat fasmat Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain in the PR description from a users perspective what this change will accomplish?


func (c *Cache) Add(
Expand Down
95 changes: 45 additions & 50 deletions txs/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -96,6 +97,7 @@ func saveTXs(t *testing.T, db *sql.Database, mtxs []*types.MeshTransaction) {
}

func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransaction, state types.TXState) {
t.Helper()
for _, mtx := range txs {
got, err := transactions.Get(db, mtx.ID)
require.NoError(t, err)
Expand All @@ -104,6 +106,7 @@ func checkTXStateFromDB(t *testing.T, db *sql.Database, txs []*types.MeshTransac
}

func checkTXNotInDB(t *testing.T, db *sql.Database, tid types.TransactionID) {
t.Helper()
_, err := transactions.Get(db, tid)
require.ErrorIs(t, err, sql.ErrNotFound)
}
Expand Down Expand Up @@ -185,7 +188,7 @@ func createSingleAccountTestCache(tb testing.TB) (*testCache, *testAcct) {
states := map[types.Address]*testAcct{principal: ta}
db := sql.InMemory()
return &testCache{
Cache: NewCache(getStateFunc(states), zaptest.NewLogger(tb)),
Cache: NewCache(getStateFunc(states), zap.NewNop()),
db: db,
}, ta
}
Expand Down Expand Up @@ -733,29 +736,27 @@ func TestCache_Account_ReplaceByFee(t *testing.T) {
func TestCache_Account_Add_InsufficientBalance_ResetAfterApply(t *testing.T) {
tc, ta := createSingleAccountTestCache(t)
buildSingleAccountCache(t, tc, ta, nil)

mtx := &types.MeshTransaction{
Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction,
mtx.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, mtx.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL)
require.False(t, tc.MoreInDB(ta.principal))
checkTXNotInDB(t, tc.db, mtx.ID)

lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
// the account will receive funds in layer 97 (via rewards or incoming transfer)
ta.balance += ta.balance
require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil))

checkTX(t, tc.Cache, mtx.ID, 0, types.EmptyBlockID)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}}
checkMempool(t, tc.Cache, expectedMempool)
checkMempool(t, tc.Cache, nil)
require.False(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL)
checkTXNotInDB(t, tc.db, mtx.ID)
}

func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testing.T) {
Expand All @@ -770,41 +771,38 @@ func TestCache_Account_Add_InsufficientBalance_HigherNonceFeasibleFirst(t *testi
Transaction: *newTx(t, ta.nonce+10, ta.balance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction, mtx0.Received, false))
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction, mtx1.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx0.Transaction,
mtx0.Received, false), errInsufficientBalance)
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx1.Transaction,
mtx1.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, mtx0.ID)
checkNoTX(t, tc.Cache, mtx1.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL)
require.False(t, tc.MoreInDB(ta.principal))
checkTXNotInDB(t, tc.db, mtx0.ID)
checkTXNotInDB(t, tc.db, mtx1.ID)

lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
// the account receive enough funds in layer 97 (via rewards or incoming transfer) for mtx1
ta.balance = mtx1.Spending()
require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{1, 2, 3}, nil, nil))
checkNoTX(t, tc.Cache, mtx0.ID)
checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID)
checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx1}}
checkMempool(t, tc.Cache, expectedMempool)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL)
checkNoTX(t, tc.Cache, mtx1.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.False(t, tc.MoreInDB(ta.principal))

lid = lid.Add(1)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
// for some reasons this account wasn't applied in layer 98.
// but the account receive enough funds in layer 98 (via rewards or incoming transfer) for both mtx0 and mtx1
ta.balance = mtx0.Spending() + mtx1.Spending()
require.NoError(t, tc.Cache.ApplyLayer(context.Background(), tc.db, lid, types.BlockID{2, 3, 4}, nil, nil))
checkTX(t, tc.Cache, mtx0.ID, 0, types.EmptyBlockID)
checkTX(t, tc.Cache, mtx1.ID, 0, types.EmptyBlockID)
checkProjection(t, tc.Cache, ta.principal, mtx1.Nonce+1, 0)
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {mtx0, mtx1}}
checkMempool(t, tc.Cache, expectedMempool)
checkNoTX(t, tc.Cache, mtx0.ID)
checkNoTX(t, tc.Cache, mtx1.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.False(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx0, mtx1}, types.MEMPOOL)
}

func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) {
Expand All @@ -815,12 +813,13 @@ func TestCache_Account_Add_InsufficientBalance_NewNonce(t *testing.T) {
Transaction: *newTx(t, ta.nonce, defaultBalance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &mtx.Transaction, mtx.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &mtx.Transaction,
mtx.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, mtx.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce, ta.balance)
checkMempool(t, tc.Cache, nil)
require.True(t, tc.MoreInDB(ta.principal))
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.MEMPOOL)
require.False(t, tc.MoreInDB(ta.principal))
checkTXNotInDB(t, tc.db, mtx.ID)
}

func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) {
Expand All @@ -836,12 +835,13 @@ func TestCache_Account_Add_InsufficientBalance_ExistingNonce(t *testing.T) {
Transaction: *newTx(t, ta.nonce, ta.balance, defaultFee, ta.signer),
Received: time.Now(),
}
require.NoError(t, tc.Add(context.Background(), tc.db, &spender.Transaction, spender.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &spender.Transaction,
spender.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, spender.ID)
checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, ta.balance-mtx.Spending())
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}}
checkMempool(t, tc.Cache, expectedMempool)
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx, spender}, types.MEMPOOL)
checkTXNotInDB(t, tc.db, spender.ID)
}

func TestCache_Account_AppliedTXsNotInCache(t *testing.T) {
Expand Down Expand Up @@ -915,10 +915,11 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) {
largeAmount := defaultBalance
for _, p := range pending {
p.MaxSpend = largeAmount
require.NoError(t, tc.Add(context.Background(), tc.db, &p.Transaction, p.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &p.Transaction,
p.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, p.ID)
checkTXNotInDB(t, tc.db, p.ID)
}
checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL)
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: {mtx}}
checkMempool(t, tc.Cache, expectedMempool)
Expand All @@ -928,28 +929,24 @@ func TestCache_Account_BalanceRelaxedAfterApply(t *testing.T) {
// transactions in `pending` feasible now
income := defaultBalance * 100
ta.nonce++
ta.balance = ta.balance - mtx.Spending() + income
ta.balance = ta.balance + income - mtx.Spending()
lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
bid := types.BlockID{1, 2, 3}
applied := makeResults(lid, bid, mtx.Transaction)
require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{}))
// all pending txs are added to cache now
newNextNonce = ta.nonce + uint64(len(pending))
newNextNonce = ta.nonce
newBalance = ta.balance
for _, p := range pending {
newBalance -= p.Spending()
}
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: pending}
checkMempool(t, tc.Cache, expectedMempool)
checkMempool(t, tc.Cache, nil)
checkTXStateFromDB(t, tc.db, []*types.MeshTransaction{mtx}, types.APPLIED)
checkTXStateFromDB(t, tc.db, pending, types.MEMPOOL)
}

func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) {
tc, ta := createSingleAccountTestCache(t)
mtxs := genAndSaveTXs(t, tc.db, ta.signer, ta.nonce, ta.nonce+4, time.Now())

newNextNonce, newBalance := buildSingleAccountCache(t, tc, ta, mtxs)

higherFee := defaultFee + 1
Expand All @@ -959,29 +956,27 @@ func TestCache_Account_BalanceRelaxedAfterApply_EvictLaterNonce(t *testing.T) {
Received: time.Now(),
}

require.NoError(t, tc.Add(context.Background(), tc.db, &better.Transaction, better.Received, false))
require.ErrorIs(t, tc.Add(context.Background(), tc.db, &better.Transaction,
better.Received, false), errInsufficientBalance)
checkNoTX(t, tc.Cache, better.ID)
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
expectedMempool := map[types.Address][]*types.MeshTransaction{ta.principal: mtxs}
checkMempool(t, tc.Cache, expectedMempool)
checkTXStateFromDB(t, tc.db, append(mtxs, better), types.MEMPOOL)
checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL)
checkTXNotInDB(t, tc.db, better.ID)

// apply lid
// there is also an incoming fund of `income` to the principal's account
// the income is just enough to allow `better` to be feasible
income := mtxs[0].Spending()
ta.nonce++
ta.balance = ta.balance - mtxs[0].Spending() + income
lid := types.LayerID(97)
require.NoError(t, layers.SetApplied(tc.db, lid.Sub(1), types.RandomBlockID()))
bid := types.BlockID{1, 2, 3}
applied := makeResults(lid, bid, mtxs[0].Transaction)
require.NoError(t, tc.ApplyLayer(context.Background(), tc.db, lid, bid, applied, []types.Transaction{}))
checkProjection(t, tc.Cache, ta.principal, ta.nonce+1, 0)
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: {better}}
expectedMempool = map[types.Address][]*types.MeshTransaction{ta.principal: mtxs[1:]}
checkMempool(t, tc.Cache, expectedMempool)
checkTXStateFromDB(t, tc.db, mtxs[:1], types.APPLIED)
checkTXStateFromDB(t, tc.db, append(mtxs[1:], better), types.MEMPOOL)
checkTXStateFromDB(t, tc.db, mtxs[1:], types.MEMPOOL)
}

func TestCache_Account_EvictedAfterApply(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions txs/conservative_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ func TestAddToCache_InsufficientBalance(t *testing.T) {
tcs.mvm.EXPECT().GetBalance(addr).Return(defaultAmount, nil).Times(1)
tcs.mvm.EXPECT().GetNonce(addr).Return(nonce, nil).Times(1)
tx := newTx(t, nonce, defaultAmount, defaultFee, signer)
require.NoError(t, tcs.AddToCache(context.Background(), tx, time.Now()))
require.ErrorIs(t, tcs.AddToCache(context.Background(), tx, time.Now()), errInsufficientBalance)
checkNoTX(t, tcs.cache, tx.ID)
require.True(t, tcs.cache.MoreInDB(addr))
checkTXStateFromDB(t, tcs.db, []*types.MeshTransaction{{Transaction: *tx}}, types.MEMPOOL)
require.False(t, tcs.cache.MoreInDB(addr))
checkTXNotInDB(t, tcs.db, tx.ID)
}

func TestAddToCache_TooManyForOneAccount(t *testing.T) {
Expand Down
Loading