Skip to content

Commit

Permalink
Merge branch 'release-3.6.0' into release-3.6.0-dmc-revert
Browse files Browse the repository at this point in the history
  • Loading branch information
JimmyShi22 authored Jan 29, 2024
2 parents 5e0f916 + 7e11450 commit 0220fc0
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 60 deletions.
54 changes: 36 additions & 18 deletions bcos-pbft/bcos-pbft/pbft/cache/PBFTCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,11 @@ void PBFTCache::resetCache(ViewType _curView)
m_precommitted = false;
PBFT_LOG(INFO) << LOG_DESC("resetCache") << LOG_KV("precommit", m_precommit ? "true" : "false")
<< LOG_KV("prePrepare", m_prePrepare ? "true" : "false")
<< LOG_KV("curView", _curView);
<< LOG_KV("prepareView", m_prePrepare ? m_prePrepare->view() : 0)
<< LOG_KV("curView", _curView)
<< ((m_prePrepare && m_prePrepare->consensusProposal()) ?
printPBFTProposal(m_prePrepare->consensusProposal()) :
"consensusProposal is null");
if (!m_precommit && m_prePrepare && m_prePrepare->consensusProposal() &&
m_prePrepare->view() < _curView)
{
Expand All @@ -315,40 +319,54 @@ void PBFTCache::resetCache(ViewType _curView)
// reset the exceptioned txs to unsealed
m_config->validator()->asyncResetTxsFlag(m_prePrepare->consensusProposal()->data(), false);
m_prePrepare = nullptr;
m_exceptionPrePrepare = nullptr;
}
if (m_exceptionPrePrepare)
resetExceptionCache(_curView);
// clear the expired prepare cache
resetCacheAfterViewChange(m_prepareCacheList, _curView);
// clear the expired commit cache
resetCacheAfterViewChange(m_commitCacheList, _curView);

// recalculate m_prepareReqWeight
recalculateQuorum(m_prepareReqWeight, m_prepareCacheList);
// recalculate m_commitReqWeight
recalculateQuorum(m_commitReqWeight, m_commitCacheList);
}

void PBFTCache::resetExceptionCache(ViewType _curView)
{
if (m_exceptionPrePrepareList.empty()) [[likely]]
{
return;
}
for (auto exceptionPrePrepare = m_exceptionPrePrepareList.begin();
exceptionPrePrepare != m_exceptionPrePrepareList.end();)
{
auto validPrePrepare = (m_precommit || (m_prePrepare && m_prePrepare->consensusProposal() &&
m_prePrepare->view() >= _curView));
if (validPrePrepare &&
(m_prePrepare && m_prePrepare->hash() == m_exceptionPrePrepare->hash()))
(m_prePrepare && m_prePrepare->hash() == (*exceptionPrePrepare)->hash()))
{
if (c_fileLogLevel == TRACE) [[unlikely]]
{
PBFT_LOG(TRACE) << LOG_DESC("resetCache : exceptionPrePrepare but finally be valid")
<< printPBFTProposal(m_exceptionPrePrepare->consensusProposal());
<< printPBFTProposal((*exceptionPrePrepare)->consensusProposal());
}
}
else
{
PBFT_LOG(INFO) << LOG_DESC("resetCache : asyncResetTxsFlag exceptionPrePrepare")
<< printPBFTProposal(m_exceptionPrePrepare->consensusProposal());
<< printPBFTProposal((*exceptionPrePrepare)->consensusProposal());
m_config->validator()->asyncResetTxsFlag(
m_exceptionPrePrepare->consensusProposal()->data(), false);
m_prePrepare = nullptr;
m_exceptionPrePrepare = nullptr;
(*exceptionPrePrepare)->consensusProposal()->data(), false);
exceptionPrePrepare = m_exceptionPrePrepareList.erase(exceptionPrePrepare);
if (exceptionPrePrepare == m_exceptionPrePrepareList.end())
{
m_prePrepare = nullptr;
break;
}
}
exceptionPrePrepare++;
}
// clear the expired prepare cache
resetCacheAfterViewChange(m_prepareCacheList, _curView);
// clear the expired commit cache
resetCacheAfterViewChange(m_commitCacheList, _curView);

// recalculate m_prepareReqWeight
recalculateQuorum(m_prepareReqWeight, m_prepareCacheList);
// recalculate m_commitReqWeight
recalculateQuorum(m_commitReqWeight, m_commitCacheList);
}

void PBFTCache::setCheckPointProposal(PBFTProposalInterface::Ptr _proposal)
Expand Down
14 changes: 12 additions & 2 deletions bcos-pbft/bcos-pbft/pbft/cache/PBFTCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class PBFTCache : public std::enable_shared_from_this<PBFTCache>
{
return;
}
// NOTE: If prePrepare is not null, it means prePrepare is already exist, and it will be
// replaced without reset proposal txs to unsealed. So we need to add the old prePrepare to
// exception prePrepare list, and it will be processed resetTxs when finalizeConsensus and
// reachNewView.
if (m_prePrepare && m_prePrepare->consensusProposal())
{
m_exceptionPrePrepareList.push_back(m_prePrepare);
}
m_prePrepare = _prePrepareMsg;
PBFT_LOG(INFO) << LOG_DESC("addPrePrepareCache") << printPBFTMsgInfo(_prePrepareMsg)
<< LOG_KV("sys", _prePrepareMsg->consensusProposal()->systemProposal())
Expand All @@ -70,7 +78,7 @@ class PBFTCache : public std::enable_shared_from_this<PBFTCache>

void addExceptionPrePrepareCache(PBFTMessageInterface::Ptr _prePrepareMsg)
{
m_exceptionPrePrepare = std::move(_prePrepareMsg);
m_exceptionPrePrepareList.push_back(std::move(_prePrepareMsg));
}

bcos::protocol::BlockNumber index() const { return m_index; }
Expand All @@ -90,6 +98,8 @@ class PBFTCache : public std::enable_shared_from_this<PBFTCache>
// reset the cache after viewchange
virtual void resetCache(ViewType _curView);

virtual void resetExceptionCache(ViewType _curView);

virtual void setCheckPointProposal(PBFTProposalInterface::Ptr _proposal);
PBFTProposalInterface::Ptr checkPointProposal() { return m_checkpointProposal; }

Expand Down Expand Up @@ -215,7 +225,7 @@ class PBFTCache : public std::enable_shared_from_this<PBFTCache>
QuorumRecoderType m_commitReqWeight;

PBFTMessageInterface::Ptr m_prePrepare = nullptr;
PBFTMessageInterface::Ptr m_exceptionPrePrepare = nullptr;
std::vector<PBFTMessageInterface::Ptr> m_exceptionPrePrepareList = {};
PBFTMessageInterface::Ptr m_precommit = nullptr;
PBFTMessageInterface::Ptr m_precommitWithoutData = nullptr;

Expand Down
1 change: 1 addition & 0 deletions bcos-pbft/bcos-pbft/pbft/cache/PBFTCacheProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ void PBFTCacheProcessor::removeConsensusedCache(
if (pcache->first <= _consensusedNumber)
{
m_proposalsToStableConsensus.erase(pcache->first);
pcache->second->resetExceptionCache(_view);
pcache = m_caches.erase(pcache);
eraseSize++;
continue;
Expand Down
13 changes: 12 additions & 1 deletion bcos-pbft/bcos-pbft/pbft/engine/PBFTEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ void PBFTEngine::onRecvProposal(bool _containSysTxs, bytesConstRef _proposalData
}
else
{
PBFT_LOG(INFO) << LOG_DESC("handlePrePrepareMsg failed") << printPBFTProposal(pbftMessage)
<< LOG_KV("costT(ms)", utcSteadyTime() - beginHandleT);
resetSealedTxs(pbftMessage);
}
}
Expand Down Expand Up @@ -711,7 +713,13 @@ CheckResult PBFTEngine::checkPBFTMsgState(PBFTMessageInterface::Ptr _pbftReq) co
<< LOG_KV("proposalCommitted", proposalCommitted);
return CheckResult::INVALID;
}
// case index equal
// Note: Accept pbft message with larger view then local view, for other nodes may viewchange to
// a larger view, and the node-self is not aware of the viewchange.
// In normal case, it will not happen, node-self will recover the view from the viewchange, and
// soon will reach to new view.
// BUT in the Byzantium case, malicious node will send the pre-prepare message with a larger
// view, to lay down some specific txs.
// FIXME: to check this logic.
if (_pbftReq->view() < m_config->view())
{
PBFT_LOG(DEBUG) << LOG_DESC("checkPBFTMsgState: invalid pbftMsg for invalid view")
Expand Down Expand Up @@ -890,6 +898,9 @@ bool PBFTEngine::handlePrePrepareMsg(PBFTMessageInterface::Ptr _prePrepareMsg,
auto result = checkPrePrepareMsg(_prePrepareMsg);
if (result == CheckResult::INVALID)
{
PBFT_LOG(INFO) << LOG_DESC("handlePrePrepareMsg checkPrePrepareMsg failed")
<< printPBFTMsgInfo(_prePrepareMsg) << m_config->printCurrentState()
<< LOG_KV("utc", utcSteadyTime());
return false;
}
if (!_generatedFromNewView)
Expand Down
2 changes: 1 addition & 1 deletion bcos-sdk/bcos-cpp-sdk/SdkFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ bcos::cppsdk::jsonrpc::JsonRpcImpl::Ptr SdkFactory::buildJsonRpc(
msg->setPayload(data);

_service->asyncSendMessageByGroupAndNode(_group, _node, msg, Options(),
[_respFunc](auto&& _error, auto&& _msg, auto&& _session) {
[_respFunc](Error::Ptr _error, MessageFace::Ptr _msg, auto&& _session) {
(void)_session;
_respFunc(_error, _msg ? _msg->payload() : nullptr);
});
Expand Down
14 changes: 10 additions & 4 deletions bcos-txpool/bcos-txpool/sync/TransactionSync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ void TransactionSync::requestMissedTxs(PublicPtr _generatedNodeID, HashListPtr _
{
return;
}
// if _generatedNodeID == nullptr, then it means request txs in schduler
if (!_generatedNodeID ||
_generatedNodeID->data() == txsSync->m_config->nodeID()->data())
{
Expand Down Expand Up @@ -264,6 +265,7 @@ void TransactionSync::requestMissedTxsFromPeer(PublicPtr _generatedNodeID, HashL
}
auto networkT = utcTime() - startT;
auto recordT = utcTime();
// verify fetch txs response
transactionSync->verifyFetchedTxs(_error, _nodeID, _data, _missedTxs,
_verifiedProposal,
[networkT, recordT, proposalHeader, _onVerifyFinished](
Expand Down Expand Up @@ -353,10 +355,10 @@ void TransactionSync::verifyFetchedTxs(Error::Ptr _error, NodeIDPtr _nodeID, byt
_onVerifyFinished(
BCOS_ERROR_PTR(CommonError::TransactionsMissing, "TransactionsMissing"), false);
// try to import the transactions even when verify failed
importDownloadedTxs(transactions);
importDownloadedTxsByBlock(transactions);
return;
}
if (!importDownloadedTxs(transactions, _verifiedProposal))
if (!importDownloadedTxsByBlock(transactions, _verifiedProposal))
{
_onVerifyFinished(BCOS_ERROR_PTR(CommonError::TxsSignatureVerifyFailed,
"invalid transaction for invalid signature or nonce or blockLimit"),
Expand All @@ -383,9 +385,11 @@ void TransactionSync::verifyFetchedTxs(Error::Ptr _error, NodeIDPtr _nodeID, byt
<< LOG_KV("timecost", (utcTime() - recordT));
}

bool TransactionSync::importDownloadedTxs(Block::Ptr _txsBuffer, Block::Ptr _verifiedProposal)
bool TransactionSync::importDownloadedTxsByBlock(
Block::Ptr _txsBuffer, Block::Ptr _verifiedProposal)
{
auto txs = std::make_shared<Transactions>();
txs->reserve(_txsBuffer->transactionsSize());
for (size_t i = 0; i < _txsBuffer->transactionsSize(); i++)
{
txs->emplace_back(std::const_pointer_cast<Transaction>(_txsBuffer->transaction(i)));
Expand All @@ -403,14 +407,15 @@ bool TransactionSync::importDownloadedTxs(TransactionsPtr _txs, Block::Ptr _veri
// Note: only need verify the signature for the transactions
bool enforceImport = false;
BlockHeader::Ptr proposalHeader = nullptr;
// if _verifiedProposal is null, it means import txs from ledger
if (_verifiedProposal && _verifiedProposal->blockHeader())
{
proposalHeader = _verifiedProposal->blockHeader();
enforceImport = true;
}
auto recordT = utcTime();
auto startT = utcTime();
// verify the transactions
// verify the transactions signature
std::atomic_bool verifySuccess = {true};
tbb::parallel_for(tbb::blocked_range<size_t>(0, txsSize),
[&_txs, &_verifiedProposal, &proposalHeader, this, &verifySuccess](
Expand All @@ -433,6 +438,7 @@ bool TransactionSync::importDownloadedTxs(TransactionsPtr _txs, Block::Ptr _veri
}
try
{
// verify failed, it will throw exception
tx->verify(*m_hashImpl, *m_signatureImpl);
}
catch (std::exception const& e)
Expand Down
2 changes: 1 addition & 1 deletion bcos-txpool/bcos-txpool/sync/TransactionSync.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TransactionSync : public TransactionSyncInterface,
Error::Ptr _error, bcos::protocol::TransactionsPtr _fetchedTxs,
bcos::protocol::Block::Ptr _verifiedProposal, VerifyResponseCallback _onVerifyFinished);

virtual bool importDownloadedTxs(bcos::protocol::Block::Ptr _txsBuffer,
virtual bool importDownloadedTxsByBlock(bcos::protocol::Block::Ptr _txsBuffer,
bcos::protocol::Block::Ptr _verifiedProposal = nullptr);

virtual bool importDownloadedTxs(bcos::protocol::TransactionsPtr _txs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ class TxPoolStorageInterface
protocol::Transaction::Ptr transaction, std::function<void()> afterInsertHook) = 0;

virtual bcos::protocol::TransactionStatus insert(bcos::protocol::Transaction::Ptr _tx) = 0;
virtual void batchInsert(bcos::protocol::Transactions const& _txs) = 0;
[[deprecated(
"do not use raw insert tx pool without check, use "
"batchVerifyAndSubmitTransaction")]] virtual void
batchInsert(bcos::protocol::Transactions const& _txs) = 0;

virtual bcos::protocol::Transaction::Ptr remove(bcos::crypto::HashType const& _txHash) = 0;
virtual bcos::protocol::Transaction::Ptr removeSubmittedTx(
Expand Down
15 changes: 6 additions & 9 deletions bcos-txpool/bcos-txpool/txpool/interfaces/TxValidatorInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
#pragma once
#include "bcos-txpool/txpool/interfaces/NonceCheckerInterface.h"
#include "bcos-txpool/txpool/validator/LedgerNonceChecker.h"
#include <bcos-framework/protocol/Transaction.h>
#include <bcos-protocol/TransactionStatus.h>
namespace bcos::txpool
Expand All @@ -32,15 +33,11 @@ class TxValidatorInterface
virtual ~TxValidatorInterface() = default;

virtual bcos::protocol::TransactionStatus verify(bcos::protocol::Transaction::ConstPtr _tx) = 0;
virtual bcos::protocol::TransactionStatus submittedToChain(
virtual bcos::protocol::TransactionStatus checkLedgerNonceAndBlockLimit(
bcos::protocol::Transaction::ConstPtr _tx) = 0;
virtual NonceCheckerInterface::Ptr ledgerNonceChecker() { return m_ledgerNonceChecker; }
virtual void setLedgerNonceChecker(NonceCheckerInterface::Ptr _ledgerNonceChecker)
{
m_ledgerNonceChecker = std::move(_ledgerNonceChecker);
}

protected:
NonceCheckerInterface::Ptr m_ledgerNonceChecker;
virtual bcos::protocol::TransactionStatus checkTxpoolNonce(
bcos::protocol::Transaction::ConstPtr _tx) = 0;
virtual LedgerNonceChecker::Ptr ledgerNonceChecker() = 0;
virtual void setLedgerNonceChecker(LedgerNonceChecker::Ptr _ledgerNonceChecker) = 0;
};
} // namespace bcos::txpool
Loading

0 comments on commit 0220fc0

Please sign in to comment.