From 4c489d18a22b642721fac2feae4339e43ec1b9ef Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 1 Sep 2020 17:57:44 +0300 Subject: [PATCH 1/8] track expected nonce in mpool, refuse messages with large gaps --- chain/messagepool/messagepool.go | 142 ++++++++++++++++++++++++------- chain/messagepool/pruning.go | 2 +- 2 files changed, 111 insertions(+), 33 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index bef0ed27192..4b1204c4a59 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -7,6 +7,7 @@ import ( "fmt" "math" stdbig "math/big" + "runtime" "sort" "sync" "time" @@ -52,6 +53,8 @@ var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee)) var MaxActorPendingMessages = 1000 +var MaxNonceGap = uint64(runtime.NumCPU()) + var ( ErrMessageTooBig = errors.New("message too big") @@ -68,6 +71,7 @@ var ( ErrSoftValidationFailure = errors.New("validation failure") ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium") ErrTooManyPendingMessages = errors.New("too many pending messages for actor") + ErrNonceGap = errors.New("unfulfilled nonce gap") ErrTryAgain = errors.New("state inconsistency while pushing message; please try again") ) @@ -131,19 +135,39 @@ type msgSet struct { requiredFunds *stdbig.Int } -func newMsgSet() *msgSet { +func newMsgSet(nonce uint64) *msgSet { return &msgSet{ msgs: make(map[uint64]*types.SignedMessage), + nextNonce: nonce, requiredFunds: stdbig.NewInt(0), } } -func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool, error) { - if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce { - ms.nextNonce = m.Message.Nonce + 1 +func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (bool, error) { + nextNonce := ms.nextNonce + nonceGap := false + switch { + case m.Message.Nonce == nextNonce: + nextNonce++ + // advance if we are filling a gap + for _, fillGap := ms.msgs[nextNonce]; fillGap; _, fillGap = ms.msgs[nextNonce] { + nextNonce++ + } + + case strict && m.Message.Nonce > nextNonce+MaxNonceGap: + return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap) + + case m.Message.Nonce > nextNonce: + nonceGap = true } + exms, has := ms.msgs[m.Message.Nonce] if has { + // refuse RBF if we have a gap + if strict && nonceGap { + return false, xerrors.Errorf("rejecting replace by fee because of nonce gap (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap) + } + if m.Cid() != exms.Cid() { // check if RBF passes minPrice := exms.Message.GasPremium @@ -165,11 +189,12 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool //ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int) } - if !has && limit && len(ms.msgs) > MaxActorPendingMessages { + if !has && strict && len(ms.msgs) > MaxActorPendingMessages { log.Errorf("too many pending messages from actor %s", m.Message.From) return false, ErrTooManyPendingMessages } + ms.nextNonce = nextNonce ms.msgs[m.Message.Nonce] = m ms.requiredFunds.Add(ms.requiredFunds, m.Message.RequiredFunds().Int) //ms.requiredFunds.Add(ms.requiredFunds, m.Message.Value.Int) @@ -177,12 +202,38 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool return !has, nil } -func (ms *msgSet) rm(nonce uint64) { +func (ms *msgSet) rm(nonce uint64, applied bool) { m, has := ms.msgs[nonce] - if has { - ms.requiredFunds.Sub(ms.requiredFunds, m.Message.RequiredFunds().Int) - //ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int) - delete(ms.msgs, nonce) + if !has { + if applied && nonce >= ms.nextNonce { + // we removed a message we did not know about because it was applied + // we need to adjust the nonce and check if we filled a gap + ms.nextNonce = nonce + 1 + for _, fillGap := ms.msgs[ms.nextNonce]; fillGap; _, fillGap = ms.msgs[ms.nextNonce] { + ms.nextNonce++ + } + } + return + } + + ms.requiredFunds.Sub(ms.requiredFunds, m.Message.RequiredFunds().Int) + //ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int) + delete(ms.msgs, nonce) + + // adjust next nonce + if applied { + // we removed a (known) message because it was applied in a tipset + // we can't possibly have filled a gap in this case + if nonce >= ms.nextNonce { + ms.nextNonce = nonce + 1 + } + return + } + + // we removed a message because it was pruned + // we have to adjust the nonce if it creates a gap or rewinds state + if nonce < ms.nextNonce { + ms.nextNonce = nonce } } @@ -476,6 +527,40 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error return mp.addLocked(m, true) } +func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { + err := mp.checkMessage(m) + if err != nil { + return err + } + + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + + curTs := mp.curTs + + snonce, err := mp.getStateNonce(m.Message.From, curTs) + if err != nil { + return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) + } + + if snonce > m.Message.Nonce { + return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow) + } + + mp.lk.Lock() + defer mp.lk.Unlock() + + if err := mp.verifyMsgBeforeAdd(m, curTs.Height()); err != nil { + return err + } + + if err := mp.checkBalance(m, curTs); err != nil { + return err + } + + return mp.addLocked(m, false) +} + func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error { mp.lk.Lock() defer mp.lk.Unlock() @@ -483,7 +568,7 @@ func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error { return mp.addLocked(m, false) } -func (mp *MessagePool) addLocked(m *types.SignedMessage, limit bool) error { +func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error { log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce) if m.Signature.Type == crypto.SigTypeBLS { mp.blsSigCache.Add(m.Cid(), m.Signature) @@ -501,11 +586,16 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, limit bool) error { mset, ok := mp.pending[m.Message.From] if !ok { - mset = newMsgSet() + nonce, err := mp.getStateNonce(m.Message.From, mp.curTs) + if err != nil { + return xerrors.Errorf("failed to get initial actor nonce: %w", err) + } + + mset = newMsgSet(nonce) mp.pending[m.Message.From] = mset } - incr, err := mset.add(m, mp, limit) + incr, err := mset.add(m, mp, strict) if err != nil { log.Info(err) return err @@ -664,14 +754,14 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, return msg, mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) } -func (mp *MessagePool) Remove(from address.Address, nonce uint64) { +func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) { mp.lk.Lock() defer mp.lk.Unlock() - mp.remove(from, nonce) + mp.remove(from, nonce, applied) } -func (mp *MessagePool) remove(from address.Address, nonce uint64) { +func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool) { mset, ok := mp.pending[from] if !ok { return @@ -688,22 +778,10 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64) { // NB: This deletes any message with the given nonce. This makes sense // as two messages with the same sender cannot have the same nonce - mset.rm(nonce) + mset.rm(nonce, applied) if len(mset.msgs) == 0 { delete(mp.pending, from) - } else { - var max uint64 - for nonce := range mset.msgs { - if max < nonce { - max = nonce - } - } - if max < nonce { - max = nonce // we could have not seen the removed message before - } - - mset.nextNonce = max + 1 } } @@ -771,7 +849,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) rm := func(from address.Address, nonce uint64) { s, ok := rmsgs[from] if !ok { - mp.Remove(from, nonce) + mp.Remove(from, nonce, true) return } @@ -780,7 +858,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) return } - mp.Remove(from, nonce) + mp.Remove(from, nonce, true) } maybeRepub := func(cid cid.Cid) { @@ -1082,7 +1160,7 @@ func (mp *MessagePool) loadLocal() error { return xerrors.Errorf("unmarshaling local message: %w", err) } - if err := mp.Add(&sm); err != nil { + if err := mp.addLoaded(&sm); err != nil { if xerrors.Is(err, ErrNonceTooLow) { continue // todo: drop the message from local cache (if above certain confidence threshold) } diff --git a/chain/messagepool/pruning.go b/chain/messagepool/pruning.go index 143dd029e8f..d1290e386f3 100644 --- a/chain/messagepool/pruning.go +++ b/chain/messagepool/pruning.go @@ -98,7 +98,7 @@ keepLoop: // and remove all messages that are still in pruneMsgs after processing the chains log.Infof("Pruning %d messages", len(pruneMsgs)) for _, m := range pruneMsgs { - mp.remove(m.Message.From, m.Message.Nonce) + mp.remove(m.Message.From, m.Message.Nonce, false) } return nil From da6d384300d5db8cec3c825e5b93d51405964315 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 1 Sep 2020 17:57:52 +0300 Subject: [PATCH 2/8] fix tests --- chain/messagepool/messagepool_test.go | 6 ++++++ chain/messagepool/selection_test.go | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index bab4b81e38b..484c72746fa 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -352,6 +352,12 @@ func TestRevertMessages(t *testing.T) { } func TestPruningSimple(t *testing.T) { + oldMaxNonceGap := MaxNonceGap + MaxNonceGap = 1000 + defer func() { + MaxNonceGap = oldMaxNonceGap + }() + tma := newTestMpoolAPI() w, err := wallet.NewWallet(wallet.NewMemKeyStore()) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index f22cd095d8b..a9797a466d3 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -369,6 +369,12 @@ func TestMessageChainSkipping(t *testing.T) { } func TestBasicMessageSelection(t *testing.T) { + oldMaxNonceGap := MaxNonceGap + MaxNonceGap = 1000 + defer func() { + MaxNonceGap = oldMaxNonceGap + }() + mp, tma := makeTestMpool() // the actors From d76a3b87c5d77c7ee7d3bf7ae10972159c1af0d4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 1 Sep 2020 17:59:44 +0300 Subject: [PATCH 3/8] ignore messages with large nonce gaps --- chain/sub/incoming.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 0175904632c..5c28aa83544 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -555,6 +555,8 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs fallthrough case xerrors.Is(err, messagepool.ErrTooManyPendingMessages): fallthrough + case xerrors.Is(err, messagepool.ErrNonceGap): + fallthrough case xerrors.Is(err, messagepool.ErrNonceTooLow): return pubsub.ValidationIgnore default: From 7b76aa2078d0b3419edccf37ca8634acca12fd6c Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 1 Sep 2020 22:46:36 +0300 Subject: [PATCH 4/8] warn when adding a nonce-gapped message to the mpool --- chain/messagepool/messagepool.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 4b1204c4a59..951c74a2c01 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -194,6 +194,11 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo return false, ErrTooManyPendingMessages } + if strict && nonceGap { + log.Warnf("adding nonce-gapped message from %s (nonce: %d, nextNonce: %d)", + m.Message.From, m.Message.Nonce, nextNonce) + } + ms.nextNonce = nextNonce ms.msgs[m.Message.Nonce] = m ms.requiredFunds.Add(ms.requiredFunds, m.Message.RequiredFunds().Int) From f53d2e3a4653a806d3dde839ed104b41007f4af1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 2 Sep 2020 00:12:43 +0300 Subject: [PATCH 5/8] cap MaxNonceGap to 16 --- chain/messagepool/messagepool.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 951c74a2c01..556c20198a9 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -53,7 +53,7 @@ var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee)) var MaxActorPendingMessages = 1000 -var MaxNonceGap = uint64(runtime.NumCPU()) +var MaxNonceGap = uint64(16) var ( ErrMessageTooBig = errors.New("message too big") @@ -82,6 +82,13 @@ const ( localUpdates = "update" ) +func init() { + numcpus := uint64(runtime.NumCPU()) + if numcpus < MaxNonceGap { + MaxNonceGap = numcpus + } +} + type MessagePool struct { lk sync.Mutex From 28f57667f0e48bc370017e3e37f125126d9d736a Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 2 Sep 2020 01:17:22 +0300 Subject: [PATCH 6/8] cap MaxNonceGap to 4, add delay between batch messages during republish --- chain/messagepool/messagepool.go | 10 +--------- chain/messagepool/repub.go | 9 +++++++++ chain/messagepool/repub_test.go | 6 ++++++ 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 556c20198a9..7fc4f2b9dea 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -7,7 +7,6 @@ import ( "fmt" "math" stdbig "math/big" - "runtime" "sort" "sync" "time" @@ -53,7 +52,7 @@ var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee)) var MaxActorPendingMessages = 1000 -var MaxNonceGap = uint64(16) +var MaxNonceGap = uint64(4) var ( ErrMessageTooBig = errors.New("message too big") @@ -82,13 +81,6 @@ const ( localUpdates = "update" ) -func init() { - numcpus := uint64(runtime.NumCPU()) - if numcpus < MaxNonceGap { - MaxNonceGap = numcpus - } -} - type MessagePool struct { lk sync.Mutex diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go index acbf23892b8..1173bdb48b5 100644 --- a/chain/messagepool/repub.go +++ b/chain/messagepool/repub.go @@ -3,6 +3,7 @@ package messagepool import ( "context" "sort" + "time" "golang.org/x/xerrors" @@ -15,6 +16,8 @@ import ( const repubMsgLimit = 30 +var RepublishBatchDelay = 200 * time.Millisecond + func (mp *MessagePool) republishPendingMessages() error { mp.curTsLk.Lock() ts := mp.curTs @@ -131,6 +134,12 @@ func (mp *MessagePool) republishPendingMessages() error { } count++ + + if count < len(msgs) { + // this delay is here to encourage the pubsub subsystem to process the messages serially + // and avoid creating nonce gaps because of concurrent validation. + time.Sleep(RepublishBatchDelay) + } } // track most recently republished messages diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go index 28a69c92ae7..70330160115 100644 --- a/chain/messagepool/repub_test.go +++ b/chain/messagepool/repub_test.go @@ -12,6 +12,12 @@ import ( ) func TestRepubMessages(t *testing.T) { + oldRepublishBatchDelay = RepublishBatchDelay + RepublishBatchDelay = time.Microsecond + defer func() { + RepublishBatchDelay = oldRepublishBatchDelay + }() + tma := newTestMpoolAPI() ds := datastore.NewMapDatastore() From a4c650316ad9d8a9169b9bdf8640455dc2ae2dee Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 2 Sep 2020 09:14:35 +0300 Subject: [PATCH 7/8] fix test --- chain/messagepool/repub_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go index 70330160115..c89367f0e6c 100644 --- a/chain/messagepool/repub_test.go +++ b/chain/messagepool/repub_test.go @@ -12,7 +12,7 @@ import ( ) func TestRepubMessages(t *testing.T) { - oldRepublishBatchDelay = RepublishBatchDelay + oldRepublishBatchDelay := RepublishBatchDelay RepublishBatchDelay = time.Microsecond defer func() { RepublishBatchDelay = oldRepublishBatchDelay From d20b6adfd2744bbde4774794fe85bf22755c3418 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 3 Sep 2020 09:00:03 +0300 Subject: [PATCH 8/8] refuse to add duplicates in the mpool --- chain/messagepool/messagepool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 7fc4f2b9dea..e41e8b0c7d9 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -182,6 +182,9 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo m.Message.From, m.Message.Nonce, minPrice, m.Message.GasPremium, ErrRBFTooLowPremium) } + } else { + return false, xerrors.Errorf("message from %s with nonce %d already in mpool: %w", + m.Message.From, m.Message.Nonce, ErrSoftValidationFailure) } ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.RequiredFunds().Int)