From eb29515824a8b323970808be139e8776f113478e Mon Sep 17 00:00:00 2001 From: Jieyi Long Date: Wed, 2 Jun 2021 12:50:42 -0700 Subject: [PATCH] Mempool memory leak and transaction broadcasting fixes --- mempool/mempool.go | 23 ++++++++++------------- rpc/tx.go | 23 ++++++++++++++--------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index 56d6c223..194b4587 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -29,6 +29,8 @@ func (m MempoolError) Error() string { } const DuplicateTxError = MempoolError("Transaction already seen") +const FastsyncSkipTxError = MempoolError("Skip tx during fastsync") + const MaxMempoolTxCount int = 25600 // @@ -186,10 +188,10 @@ func (mp *Mempool) InsertTransaction(rawTx common.Bytes) error { return DuplicateTxError } - if mp.size >= MaxMempoolTxCount { - logger.Debugf("Mempool is full") - return errors.New("mempool is full, please submit your transaction again later") - } + // if mp.size >= MaxMempoolTxCount { + // logger.Debugf("Mempool is full") + // return errors.New("mempool is full, please submit your transaction again later") + // } var txInfo *core.TxInfo var checkTxRes result.Result @@ -219,18 +221,13 @@ func (mp *Mempool) InsertTransaction(rawTx common.Bytes) error { } mp.candidateTxs.Push(txGroup) logger.Debugf("rawTx: %v, txInfo: %v", hex.EncodeToString(rawTx), txInfo) - } else { - // Record tx during sync for gossiping purpose - mp.txBookeepper.record(rawTx) + logger.Infof("Insert tx, tx.hash: 0x%v", getTransactionHash(rawTx)) + mp.size++ - logger.Debug("Skipping tx vefification during sync") + return nil } - logger.Infof("Insert tx, tx.hash: 0x%v", getTransactionHash(rawTx)) - - mp.newTxs.PushBack(rawTx) - mp.size++ - return nil + return FastsyncSkipTxError } // Start needs to be called when the Mempool starts diff --git a/rpc/tx.go b/rpc/tx.go index acc537cc..6c49f21e 100644 --- a/rpc/tx.go +++ b/rpc/tx.go @@ -10,6 +10,7 @@ import ( "github.com/thetatoken/theta/common/hexutil" "github.com/thetatoken/theta/core" "github.com/thetatoken/theta/crypto" + "github.com/thetatoken/theta/mempool" ) const txTimeout = 60 * time.Second @@ -139,15 +140,17 @@ func (t *ThetaRPCService) BroadcastRawTransaction( hash := crypto.Keccak256Hash(txBytes) result.TxHash = hash.Hex() - logger.Infof("Broadcast raw transaction (sync): %v, hash: %v", hex.EncodeToString(txBytes), hash.Hex()) + logger.Infof("Prepare to broadcast raw transaction (sync): %v, hash: %v", hex.EncodeToString(txBytes), hash.Hex()) err = t.mempool.InsertTransaction(txBytes) - if err != nil { + if err == nil || err == mempool.FastsyncSkipTxError { + t.mempool.BroadcastTx(txBytes) // still broadcast the transactions received locally during the fastsync mode + logger.Infof("Broadcasted raw transaction (sync): %v, hash: %v", hex.EncodeToString(txBytes), hash.Hex()) + } else { + logger.Warnf("Failed to broadcast raw transaction (sync): %v, hash: %v, err: %v", hex.EncodeToString(txBytes), hash.Hex(), err) return err } - t.mempool.BroadcastTx(txBytes) - finalized := make(chan *core.Block) timeout := time.NewTimer(txTimeout) defer timeout.Stop() @@ -192,16 +195,18 @@ func (t *ThetaRPCService) BroadcastRawTransactionAsync( hash := crypto.Keccak256Hash(txBytes) result.TxHash = hash.Hex() - logger.Infof("Broadcast raw transaction (async): %v, hash: %v", hex.EncodeToString(txBytes), hash.Hex()) + logger.Infof("Prepare to broadcast raw transaction (async): %v, hash: %v", hex.EncodeToString(txBytes), hash.Hex()) err = t.mempool.InsertTransaction(txBytes) - if err != nil { - return err + if err == nil || err == mempool.FastsyncSkipTxError { + t.mempool.BroadcastTx(txBytes) // still broadcast the transactions received locally during the fastsync mode + logger.Infof("Broadcasted raw transaction (async): %v, hash: %v", hex.EncodeToString(txBytes), hash.Hex()) + return nil } - t.mempool.BroadcastTx(txBytes) + logger.Warnf("Failed to broadcast raw transaction (async): %v, hash: %v, err: %v", hex.EncodeToString(txBytes), hash.Hex(), err) - return nil + return err } // -------------------------- Utilities -------------------------- //