From dc6324c8d03f585bf8bf33f91d9a44c0631a7161 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Thu, 5 Sep 2024 15:54:12 +0200 Subject: [PATCH] Layered txpool: fix for unsent drop notifications on remove (#7538) Signed-off-by: Fabio Di Fabio --- CHANGELOG.md | 1 + .../transactions/TransactionPoolMetrics.java | 2 +- .../AbstractSequentialTransactionsLayer.java | 9 +- .../layered/AbstractTransactionsLayer.java | 17 +- .../eth/transactions/layered/AddReason.java | 65 ++ .../BaseFeePrioritizedTransactions.java | 18 +- .../eth/transactions/layered/EndLayer.java | 5 +- .../layered/LayeredPendingTransactions.java | 35 +- .../layered/ReadyTransactions.java | 2 +- .../transactions/layered/RemovalReason.java | 131 ++++ .../layered/SparseTransactions.java | 7 +- .../layered/TransactionsLayer.java | 76 +-- ...stractPrioritizedTransactionsTestBase.java | 2 +- .../layered/BaseTransactionPoolTest.java | 1 - .../LayeredPendingTransactionsTest.java | 8 +- .../eth/transactions/layered/LayersTest.java | 609 ++++++++++++------ .../eth/transactions/layered/ReplayTest.java | 2 +- 17 files changed, 682 insertions(+), 308 deletions(-) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AddReason.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/RemovalReason.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 40bdb40ed1c..c6e16c0d8a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ ### Bug fixes - Layered txpool: do not send notifications when moving tx between layers [#7539](https://github.com/hyperledger/besu/pull/7539) +- Layered txpool: fix for unsent drop notifications on remove [#7538](https://github.com/hyperledger/besu/pull/7538) ## 24.9.0 diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java index 45f895881cf..e08805551f9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java @@ -15,7 +15,7 @@ package org.hyperledger.besu.ethereum.eth.transactions; import org.hyperledger.besu.datatypes.TransactionType; -import org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason; +import org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason; import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractSequentialTransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractSequentialTransactionsLayer.java index 2725f0012eb..f7d71ca3724 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractSequentialTransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractSequentialTransactionsLayer.java @@ -14,9 +14,9 @@ */ package org.hyperledger.besu.ethereum.eth.transactions.layered; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.FOLLOW_INVALIDATED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.FOLLOW_INVALIDATED; import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -24,6 +24,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; +import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason; import java.util.Map; import java.util.NavigableMap; @@ -45,7 +46,7 @@ public AbstractSequentialTransactionsLayer( } @Override - public void remove(final PendingTransaction invalidatedTx, final RemovalReason reason) { + public void remove(final PendingTransaction invalidatedTx, final PoolRemovalReason reason) { nextLayer.remove(invalidatedTx, reason); final var senderTxs = txsBySender.get(invalidatedTx.getSender()); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java index 81178a07098..b429de4a754 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java @@ -19,11 +19,12 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.TRY_NEXT_LAYER; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CONFIRMED; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CROSS_LAYER_REPLACED; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CONFIRMED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CROSS_LAYER_REPLACED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.REPLACED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.RemovedFrom.POOL; import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.datatypes.Hash; @@ -292,7 +293,8 @@ public PendingTransaction promoteFor( if (remainingPromotionsPerType[txType.ordinal()] > 0) { senderTxs.pollFirstEntry(); - processRemove(senderTxs, candidateTx.getTransaction(), RemovalReason.PROMOTED); + processRemove( + senderTxs, candidateTx.getTransaction(), RemovalReason.LayerMoveReason.PROMOTED); metrics.incrementRemoved(candidateTx, "promoted", name()); if (senderTxs.isEmpty()) { @@ -419,6 +421,9 @@ protected PendingTransaction processRemove( decreaseCounters(removedTx); metrics.incrementRemoved(removedTx, removalReason.label(), name()); internalRemove(senderTxs, removedTx, removalReason); + if (removalReason.removedFrom().equals(POOL)) { + notifyTransactionDropped(removedTx); + } } return removedTx; } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AddReason.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AddReason.java new file mode 100644 index 00000000000..895dd61e3e2 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AddReason.java @@ -0,0 +1,65 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.transactions.layered; + +import java.util.Locale; + +/** Describe why we are trying to add a tx to a layer. */ +public enum AddReason { + /** When adding a tx, that is not present in the pool. */ + NEW(true, true), + /** When adding a tx as result of an internal move between layers. */ + MOVE(false, false), + /** When adding a tx as result of a promotion from a lower layer. */ + PROMOTED(false, false); + + private final boolean sendNotification; + private final boolean makeCopy; + private final String label; + + AddReason(final boolean sendNotification, final boolean makeCopy) { + this.sendNotification = sendNotification; + this.makeCopy = makeCopy; + this.label = name().toLowerCase(Locale.ROOT); + } + + /** + * Should we send add notification for this reason? + * + * @return true if notification should be sent + */ + public boolean sendNotification() { + return sendNotification; + } + + /** + * Should the layer make a copy of the pending tx before adding it, to avoid keeping reference to + * potentially large underlying byte buffers? + * + * @return true is a copy is necessary + */ + public boolean makeCopy() { + return makeCopy; + } + + /** + * Return a label that identify this reason to be used in the metric system. + * + * @return a label + */ + public String label() { + return label; + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java index 170205e4986..c06fb53e33a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java @@ -14,8 +14,8 @@ */ package org.hyperledger.besu.ethereum.eth.transactions.layered; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.BELOW_BASE_FEE; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.DEMOTED; import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -105,7 +105,7 @@ protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket while (itTxsBySender.hasNext()) { final var senderTxs = itTxsBySender.next().getValue(); - Optional maybeFirstUnderpricedNonce = Optional.empty(); + Optional maybeFirstDemotedNonce = Optional.empty(); for (final var e : senderTxs.entrySet()) { final PendingTransaction tx = e.getValue(); @@ -115,25 +115,27 @@ protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket } else { // otherwise sender txs starting from this nonce need to be demoted to next layer, // and we can go to next sender - maybeFirstUnderpricedNonce = Optional.of(e.getKey()); + maybeFirstDemotedNonce = Optional.of(e.getKey()); break; } } - maybeFirstUnderpricedNonce.ifPresent( + maybeFirstDemotedNonce.ifPresent( nonce -> { - // demote all txs after the first underpriced to the next layer, because none of them is + // demote all txs after the first demoted to the next layer, because none of them is // executable now, and we can avoid sorting them until they are candidate for execution // again final var demoteTxs = senderTxs.tailMap(nonce, true); while (!demoteTxs.isEmpty()) { final PendingTransaction demoteTx = demoteTxs.pollLastEntry().getValue(); LOG.atTrace() - .setMessage("Demoting tx {} with max gas price below next block base fee {}") + .setMessage( + "Demoting tx {} since it does not respect anymore the requisites to stay in this layer." + + " Next block base fee {}") .addArgument(demoteTx::toTraceLog) .addArgument(newNextBlockBaseFee::toHumanReadableString) .log(); - processEvict(senderTxs, demoteTx, BELOW_BASE_FEE); + processEvict(senderTxs, demoteTx, DEMOTED); addToNextLayer(senderTxs, demoteTx, 0, MOVE); } }); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java index 16571e7aef8..c76d2ab97a6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java @@ -14,7 +14,7 @@ */ package org.hyperledger.besu.ethereum.eth.transactions.layered; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.DROPPED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.DROPPED; import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.datatypes.Hash; @@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener; import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; +import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason; import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; import org.hyperledger.besu.util.Subscribers; @@ -84,7 +85,7 @@ public TransactionAddedResult add( } @Override - public void remove(final PendingTransaction pendingTransaction, final RemovalReason reason) {} + public void remove(final PendingTransaction pendingTransaction, final PoolRemovalReason reason) {} @Override public void penalize(final PendingTransaction penalizedTx) {} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java index 3e0a87da2ff..0209900eacc 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java @@ -20,9 +20,9 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.INTERNAL_ERROR; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.RECONCILED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.RECONCILED; import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.datatypes.Hash; @@ -316,7 +316,6 @@ public synchronized List getPriorityTransactions() { @Override public void selectTransactions(final PendingTransactions.TransactionSelector selector) { - final List invalidTransactions = new ArrayList<>(); final List penalizedTransactions = new ArrayList<>(); final Set
skipSenders = new HashSet<>(); @@ -347,7 +346,12 @@ public void selectTransactions(final PendingTransactions.TransactionSelector sel .log(); if (selectionResult.discard()) { - invalidTransactions.add(candidatePendingTx); + ethScheduler.scheduleTxWorkerTask( + () -> { + synchronized (this) { + prioritizedTransactions.remove(candidatePendingTx, INVALIDATED); + } + }); logDiscardedTransaction(candidatePendingTx, selectionResult); } @@ -377,20 +381,13 @@ public void selectTransactions(final PendingTransactions.TransactionSelector sel } ethScheduler.scheduleTxWorkerTask( - () -> { - invalidTransactions.forEach( - invalidTx -> { - synchronized (this) { - prioritizedTransactions.remove(invalidTx, INVALIDATED); - } - }); - penalizedTransactions.forEach( - penalizedTx -> { - synchronized (this) { - prioritizedTransactions.internalPenalize(penalizedTx); - } - }); - }); + () -> + penalizedTransactions.forEach( + penalizedTx -> { + synchronized (this) { + prioritizedTransactions.penalize(penalizedTx); + } + })); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java index 0f52e1c5c3e..7c7ddcdaeb2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java @@ -14,7 +14,7 @@ */ package org.hyperledger.besu.ethereum.eth.transactions.layered; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.PROMOTED; import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.ethereum.core.BlockHeader; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/RemovalReason.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/RemovalReason.java new file mode 100644 index 00000000000..8b97861b88b --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/RemovalReason.java @@ -0,0 +1,131 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.transactions.layered; + +import java.util.Locale; + +/** The reason why a pending tx has been removed */ +public interface RemovalReason { + /** + * From where the tx has been removed + * + * @return removed from item + */ + RemovedFrom removedFrom(); + + /** + * Return a label that identify this reason to be used in the metric system. + * + * @return a label + */ + String label(); + + /** There are 2 kinds of removals, from a layer and from the pool. */ + enum RemovedFrom { + /** + * Removing from a layer, can be also seen as a move between layers, since it is removed + * from the current layer and added to another layer, for example in the case the layer is full + * and some txs need to be moved to the next layer, or in the opposite case when some txs are + * promoted to the upper layer. + */ + LAYER, + /** + * Removing from the pool, instead means that the tx is directly removed from the pool, and it + * will not be present in any layer, for example, when it is added to an imported block, or it + * is replaced by another tx. + */ + POOL + } + + /** The reason why the tx has been removed from the pool */ + enum PoolRemovalReason implements RemovalReason { + /** Tx removed since it is confirmed on chain, as part of an imported block. */ + CONFIRMED(), + /** Tx removed since it has been replaced by another one added in the same layer. */ + REPLACED(), + /** Tx removed since it has been replaced by another one added in another layer. */ + CROSS_LAYER_REPLACED(), + /** Tx removed when the pool is full, to make space for new incoming txs. */ + DROPPED(), + /** + * Tx removed since found invalid after it was added to the pool, for example during txs + * selection for a new block proposal. + */ + INVALIDATED(), + /** + * Special case, when for a sender, discrepancies are found between the world state view and the + * pool view, then all the txs for this sender are removed and added again. Discrepancies, are + * rare, and can happen during a short windows when a new block is being imported and the world + * state being updated. + */ + RECONCILED(); + + private final String label; + + PoolRemovalReason() { + this.label = name().toLowerCase(Locale.ROOT); + } + + @Override + public RemovedFrom removedFrom() { + return RemovedFrom.POOL; + } + + @Override + public String label() { + return label; + } + } + + /** The reason why the tx has been moved across layers */ + enum LayerMoveReason implements RemovalReason { + /** + * When the current layer is full, and this tx needs to be moved to the lower layer, in order to + * free space. + */ + EVICTED(), + /** + * Specific to sequential layers, when a tx is removed because found invalid, then if the sender + * has other txs with higher nonce, then a gap is created, and since sequential layers do not + * permit gaps, txs following the invalid one need to be moved to lower layers. + */ + FOLLOW_INVALIDATED(), + /** + * When a tx is moved to the upper layer, since it satisfies all the requirement to be promoted. + */ + PROMOTED(), + /** + * When a tx is moved to the lower layer, since it, or a preceding one from the same sender, + * does not respect anymore the requisites to stay in this layer. + */ + DEMOTED(); + + private final String label; + + LayerMoveReason() { + this.label = name().toLowerCase(Locale.ROOT); + } + + @Override + public RemovedFrom removedFrom() { + return RemovedFrom.LAYER; + } + + @Override + public String label() { + return label; + } + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java index aef318e6df9..7a20f945409 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java @@ -14,8 +14,8 @@ */ package org.hyperledger.besu.ethereum.eth.transactions.layered; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.PROMOTED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED; import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; +import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason; import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; import java.util.ArrayList; @@ -225,7 +226,7 @@ private NavigableMap getSequentialSubset( @Override public synchronized void remove( - final PendingTransaction invalidatedTx, final RemovalReason reason) { + final PendingTransaction invalidatedTx, final PoolRemovalReason reason) { final var senderTxs = txsBySender.get(invalidatedTx.getSender()); if (senderTxs != null && senderTxs.containsKey(invalidatedTx.getNonce())) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java index 0117ed71b61..16ce957fb89 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java @@ -22,10 +22,10 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener; import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener; import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult; +import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason; import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; @@ -55,7 +55,13 @@ public interface TransactionsLayer { */ TransactionAddedResult add(PendingTransaction pendingTransaction, int gap, AddReason addReason); - void remove(PendingTransaction pendingTransaction, RemovalReason reason); + /** + * Remove the pending tx from the pool + * + * @param pendingTransaction the pending tx + * @param reason the reason it is removed from the pool + */ + void remove(PendingTransaction pendingTransaction, PoolRemovalReason reason); /** * Penalize a pending transaction. Penalization could be applied to notify the txpool that this @@ -119,70 +125,4 @@ List promote( String logStats(); String logSender(Address sender); - - /** Describe why we are trying to add a tx to a layer. */ - enum AddReason { - /** When adding a tx, that is not present in the pool. */ - NEW(true, true), - /** When adding a tx as result of an internal move between layers. */ - MOVE(false, false), - /** When adding a tx as result of a promotion from a lower layer. */ - PROMOTED(false, false); - - private final boolean sendNotification; - private final boolean makeCopy; - private final String label; - - AddReason(final boolean sendNotification, final boolean makeCopy) { - this.sendNotification = sendNotification; - this.makeCopy = makeCopy; - this.label = name().toLowerCase(Locale.ROOT); - } - - /** - * Should we send add notification for this reason? - * - * @return true if notification should be sent - */ - public boolean sendNotification() { - return sendNotification; - } - - /** - * Should the layer make a copy of the pending tx before adding it, to avoid keeping reference - * to potentially large underlying byte buffers? - * - * @return true is a copy is necessary - */ - public boolean makeCopy() { - return makeCopy; - } - - public String label() { - return label; - } - } - - enum RemovalReason { - CONFIRMED, - CROSS_LAYER_REPLACED, - EVICTED, - DROPPED, - FOLLOW_INVALIDATED, - INVALIDATED, - PROMOTED, - REPLACED, - RECONCILED, - BELOW_BASE_FEE; - - private final String label; - - RemovalReason() { - this.label = name().toLowerCase(Locale.ROOT); - } - - public String label() { - return label; - } - } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java index f9e16fa0ea0..6a0005eec63 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java @@ -17,7 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.DROPPED; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW; import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.datatypes.Wei; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java index 5cd098ef6fa..864c8c9181f 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java @@ -35,7 +35,6 @@ import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction; import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactions; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; -import org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason; import org.hyperledger.besu.evm.account.Account; import org.hyperledger.besu.metrics.StubMetricsSystem; import org.hyperledger.besu.testutil.DeterministicEthScheduler; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java index a5e7a151e22..4fa3fa727aa 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java @@ -20,10 +20,10 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.DROPPED; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.DROPPED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.REPLACED; import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE; import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE; import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOB_PRICE_BELOW_CURRENT_MIN; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java index 3df5fb6d3ac..2036e46d63d 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java @@ -15,6 +15,8 @@ package org.hyperledger.besu.ethereum.eth.transactions.layered; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; import static org.hyperledger.besu.datatypes.TransactionType.ACCESS_LIST; import static org.hyperledger.besu.datatypes.TransactionType.BLOB; import static org.hyperledger.besu.datatypes.TransactionType.EIP1559; @@ -25,7 +27,7 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.S4; import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.SP1; import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.SP2; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,6 +37,7 @@ import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.MiningParameters; +import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.Util; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; @@ -51,11 +54,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumMap; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.OptionalLong; +import java.util.TreeMap; import java.util.stream.Stream; import org.junit.jupiter.api.Test; @@ -156,7 +162,7 @@ void prioritySenders(final Scenario scenario) { @ParameterizedTest @MethodSource("providerMaxPrioritizedByType") void maxPrioritizedByType(final Scenario scenario) { - assertScenario(scenario, BLOB_TX_POOL_CONFIG); + assertScenario(scenario); } @ParameterizedTest @@ -166,54 +172,7 @@ void penalized(final Scenario scenario) { } private void assertScenario(final Scenario scenario) { - assertScenario(scenario, DEFAULT_TX_POOL_CONFIG); - } - - private void assertScenario( - final Scenario scenario, final TransactionPoolConfiguration poolConfig) { - final TransactionPoolMetrics txPoolMetrics = new TransactionPoolMetrics(metricsSystem); - - final EvictCollectorLayer evictCollector = new EvictCollectorLayer(txPoolMetrics); - final EthScheduler ethScheduler = new DeterministicEthScheduler(); - final SparseTransactions sparseTransactions = - new SparseTransactions( - poolConfig, - ethScheduler, - evictCollector, - txPoolMetrics, - (pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2), - new BlobCache()); - - final ReadyTransactions readyTransactions = - new ReadyTransactions( - poolConfig, - ethScheduler, - sparseTransactions, - txPoolMetrics, - (pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2), - new BlobCache()); - - final BaseFeePrioritizedTransactions prioritizedTransactions = - new BaseFeePrioritizedTransactions( - poolConfig, - LayersTest::mockBlockHeader, - ethScheduler, - readyTransactions, - txPoolMetrics, - (pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2), - FeeMarket.london(0L), - new BlobCache(), - MiningParameters.newDefault().setMinTransactionGasPrice(MIN_GAS_PRICE)); - - final LayeredPendingTransactions pendingTransactions = - new LayeredPendingTransactions(poolConfig, prioritizedTransactions, ethScheduler); - - scenario.execute( - pendingTransactions, - prioritizedTransactions, - readyTransactions, - sparseTransactions, - evictCollector); + scenario.run(); } static Stream providerAddTransactions() { @@ -452,7 +411,7 @@ static Stream providerAddTransactionsMultipleSenders() { .expectedReadyForSenders(S1, 0, S1, 1) .expectedSparseForSender(S3, 2) .addForSenders(S3, 1) - // ToDo: only S3[1] is prioritized because there is no space to try to fill gaps + // only S3[1] is prioritized because there is no space to try to fill gaps .expectedPrioritizedForSenders(S3, 0, S3, 1, S2, 0) .expectedReadyForSenders(S2, 1, S1, 0, S1, 1) .expectedSparseForSender(S3, 2) @@ -465,11 +424,11 @@ static Stream providerAddTransactionsMultipleSenders() { Arguments.of( new Scenario("replacement cross layer") .addForSenders(S2, 0, S3, 2, S1, 1, S2, 1, S3, 0, S1, 0, S3, 1) - // ToDo: only S3[1] is prioritized because there is no space to try to fill gaps + // only S3[1] is prioritized because there is no space to try to fill gaps .expectedPrioritizedForSenders(S3, 0, S3, 1, S2, 0) .expectedReadyForSenders(S2, 1, S1, 0, S1, 1) .expectedSparseForSender(S3, 2) - .addForSenders(S3, 2) // added in prioritized, but replacement in sparse + .replaceForSenders(S3, 2) // added in prioritized, but replacement in sparse .expectedPrioritizedForSenders(S3, 0, S3, 1, S3, 2) .expectedReadyForSenders(S2, 0, S2, 1, S1, 0) .expectedSparseForSender(S1, 1))); @@ -477,6 +436,8 @@ static Stream providerAddTransactionsMultipleSenders() { static Stream providerRemoveTransactions() { return Stream.of( + // when expected*ForSender(s) is not present, by default there is a check that the layers + // are empty Arguments.of(new Scenario("remove not existing").removeForSender(S1, 0)), Arguments.of(new Scenario("add/remove first").addForSender(S1, 0).removeForSender(S1, 0)), Arguments.of( @@ -1060,10 +1021,10 @@ static Stream providerReorg() { .expectedNextNonceForSenders(S1, 3) .addForSender(S1, 3) .expectedPrioritizedForSender(S1, 2, 3) - .setAccountNonce(S1, 0) // rewind nonce due to reorg - .addForSender(S1, 0) - .expectedPrioritizedForSender(S1, 0) - .expectedSparseForSender(S1, 2, 3))); + .reorgForSenders(S1, 0) // rewind nonce due to reorg + .addForSender(S1, 0, 1) // re-add reorged txs + .expectedPrioritizedForSender(S1, 0, 1, 2) + .expectedReadyForSender(S1, 3))); } static Stream providerAsyncWorldStateUpdates() { @@ -1221,22 +1182,22 @@ static Stream providerPrioritySenders() { static Stream providerMaxPrioritizedByType() { return Stream.of( Arguments.of( - new Scenario("first blob tx is prioritized") + new Scenario("first blob tx is prioritized", BLOB_TX_POOL_CONFIG) .addForSender(S1, BLOB, 0) .expectedPrioritizedForSender(S1, 0)), Arguments.of( - new Scenario("multiple senders only first blob tx is prioritized") + new Scenario("multiple senders only first blob tx is prioritized", BLOB_TX_POOL_CONFIG) .addForSender(S1, BLOB, 0) .addForSender(S2, BLOB, 0) .expectedPrioritizedForSender(S1, 0) .expectedReadyForSender(S2, 0)), Arguments.of( - new Scenario("same sender following blob txs are moved to ready") + new Scenario("same sender following blob txs are moved to ready", BLOB_TX_POOL_CONFIG) .addForSender(S1, BLOB, 0, 1, 2) .expectedPrioritizedForSender(S1, 0) .expectedReadyForSender(S1, 1, 2)), Arguments.of( - new Scenario("promoting txs respect prioritized count limit") + new Scenario("promoting txs respect prioritized count limit", BLOB_TX_POOL_CONFIG) .addForSender(S1, BLOB, 0, 1, 2) .expectedPrioritizedForSender(S1, 0) .expectedReadyForSender(S1, 1, 2) @@ -1244,14 +1205,14 @@ static Stream providerMaxPrioritizedByType() { .expectedPrioritizedForSender(S1, 1) .expectedReadyForSender(S1, 2)), Arguments.of( - new Scenario("filling gaps respect prioritized count limit") + new Scenario("filling gaps respect prioritized count limit", BLOB_TX_POOL_CONFIG) .addForSender(S1, BLOB, 1) .expectedSparseForSender(S1, 1) .addForSender(S1, BLOB, 0) .expectedPrioritizedForSender(S1, 0) .expectedSparseForSender(S1, 1)), Arguments.of( - new Scenario("promoting to ready is unbounded") + new Scenario("promoting to ready is unbounded", BLOB_TX_POOL_CONFIG) .addForSender(S1, BLOB, 0, 1, 2, 3, 4, 5, 6) .expectedPrioritizedForSender(S1, 0) .expectedReadyForSender(S1, 1, 2, 3) @@ -1351,18 +1312,18 @@ private static boolean transactionReplacementTester( return transactionReplacementHandler.shouldReplace(pt1, pt2, mockBlockHeader()); } - static class Scenario extends BaseTransactionPoolTest { - interface TransactionLayersConsumer { - void accept( - LayeredPendingTransactions pending, - AbstractPrioritizedTransactions prioritized, - ReadyTransactions ready, - SparseTransactions sparse, - EvictCollectorLayer dropped); - } + static class Scenario extends BaseTransactionPoolTest implements Runnable { final String description; - final List actions = new ArrayList<>(); + final TransactionPoolConfiguration poolConfig; + final EvictCollectorLayer dropped; + final SparseTransactions sparse; + final ReadyTransactions ready; + final AbstractPrioritizedTransactions prio; + final LayeredPendingTransactions pending; + + final NotificationsChecker notificationsChecker = new NotificationsChecker(); + final List actions = new ArrayList<>(); List lastExpectedPrioritized = new ArrayList<>(); List lastExpectedReady = new ArrayList<>(); List lastExpectedSparse = new ArrayList<>(); @@ -1374,94 +1335,281 @@ void accept( Arrays.stream(Sender.values()).forEach(e -> nonceBySender.put(e, 0L)); } - final EnumMap> txsBySender = new EnumMap<>(Sender.class); + final EnumSet sendersWithReorg = EnumSet.noneOf(Sender.class); + + final EnumMap> liveTxsBySender = + new EnumMap<>(Sender.class); + + { + Arrays.stream(Sender.values()).forEach(e -> liveTxsBySender.put(e, new TreeMap<>())); + } + + final EnumMap> droppedTxsBySender = + new EnumMap<>(Sender.class); { - Arrays.stream(Sender.values()).forEach(e -> txsBySender.put(e, new HashMap<>())); + Arrays.stream(Sender.values()).forEach(e -> droppedTxsBySender.put(e, new TreeMap<>())); } Scenario(final String description) { + this(description, DEFAULT_TX_POOL_CONFIG); + } + + Scenario(final String description, final TransactionPoolConfiguration poolConfig) { this.description = description; + this.poolConfig = poolConfig; + + final TransactionPoolMetrics txPoolMetrics = new TransactionPoolMetrics(metricsSystem); + + this.dropped = new EvictCollectorLayer(txPoolMetrics); + final EthScheduler ethScheduler = new DeterministicEthScheduler(); + this.sparse = + new SparseTransactions( + poolConfig, + ethScheduler, + this.dropped, + txPoolMetrics, + (pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2), + new BlobCache()); + + this.ready = + new ReadyTransactions( + poolConfig, + ethScheduler, + this.sparse, + txPoolMetrics, + (pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2), + new BlobCache()); + + this.prio = + new BaseFeePrioritizedTransactions( + poolConfig, + LayersTest::mockBlockHeader, + ethScheduler, + this.ready, + txPoolMetrics, + (pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2), + FeeMarket.london(0L), + new BlobCache(), + MiningParameters.newDefault().setMinTransactionGasPrice(MIN_GAS_PRICE)); + + this.pending = new LayeredPendingTransactions(poolConfig, this.prio, ethScheduler); + + this.pending.subscribePendingTransactions(notificationsChecker::collectAddNotification); + this.pending.subscribeDroppedTransactions(notificationsChecker::collectDropNotification); } - Scenario addForSender(final Sender sender, final long... nonce) { + @Override + public void run() { + actions.forEach(Runnable::run); + assertExpectedPrioritized(prio, lastExpectedPrioritized); + assertExpectedReady(ready, lastExpectedReady); + assertExpectedSparse(sparse, lastExpectedSparse); + assertExpectedDropped(dropped, lastExpectedDropped); + } + + public Scenario addForSender(final Sender sender, final long... nonce) { return addForSender(sender, EIP1559, nonce); } - Scenario addForSender(final Sender sender, final TransactionType type, final long... nonce) { - Arrays.stream(nonce) - .forEach( - n -> { - final var pendingTx = getOrCreate(sender, type, n); - actions.add( - (pending, prio, ready, sparse, dropped) -> { + public Scenario addForSender( + final Sender sender, final TransactionType type, final long... nonce) { + internalAddForSender(sender, type, nonce); + actions.add(notificationsChecker::assertExpectedNotifications); + return this; + } + + private void internalAddForSender( + final Sender sender, final TransactionType type, final long... nonce) { + actions.add( + () -> { + Arrays.stream(nonce) + .forEach( + n -> { + final var pendingTx = create(sender, type, n); final Account mockSender = mock(Account.class); when(mockSender.getNonce()).thenReturn(nonceBySender.get(sender)); pending.addTransaction(pendingTx, Optional.of(mockSender)); + notificationsChecker.addExpectedAddNotification(pendingTx); }); + + // reorg case + if (sendersWithReorg.contains(sender)) { + // reorg is removing and re-adding all sender txs, so assert notifications accordingly + final var currentPendingTxs = + liveTxsBySender.get(sender).tailMap(nonce[nonce.length - 1], false).values(); + currentPendingTxs.forEach( + pt -> { + notificationsChecker.addExpectedAddNotification(pt); + notificationsChecker.addExpectedDropNotification(pt); + }); + sendersWithReorg.remove(sender); + } + + // reconciliation case + final var txsRemovedByReconciliation = + liveTxsBySender.get(sender).headMap(nonceBySender.get(sender), false).values(); + if (!txsRemovedByReconciliation.isEmpty()) { + // reconciliation is removing all sender txs, and re-adding only the ones with a + // larger nonce, so assert notifications accordingly + final var reconciledPendingTxs = + liveTxsBySender.get(sender).tailMap(nonce[nonce.length - 1], false).values(); + txsRemovedByReconciliation.forEach(notificationsChecker::addExpectedDropNotification); + reconciledPendingTxs.forEach( + pt -> { + notificationsChecker.addExpectedDropNotification(pt); + notificationsChecker.addExpectedAddNotification(pt); + }); + txsRemovedByReconciliation.clear(); + } + + handleDropped(); + }); + } + + private void handleDropped() { + // handle dropped tx due to layer or pool full + final var droppedTxs = dropped.getEvictedTransactions(); + droppedTxs.forEach(notificationsChecker::addExpectedDropNotification); + droppedTxs.stream() + .forEach( + pt -> { + liveTxsBySender.get(Sender.getByAddress(pt.getSender())).remove(pt.getNonce()); + droppedTxsBySender.get(Sender.getByAddress(pt.getSender())).put(pt.getNonce(), pt); }); - return this; } - Scenario addForSenders(final Object... args) { + public Scenario addForSenders(final Object... args) { for (int i = 0; i < args.length; i = i + 2) { final Sender sender = (Sender) args[i]; final long nonce = (int) args[i + 1]; - addForSender(sender, nonce); + internalAddForSender(sender, EIP1559, nonce); } + actions.add(notificationsChecker::assertExpectedNotifications); return this; } - public Scenario confirmedForSenders(final Object... args) { - final Map maxConfirmedNonceBySender = new HashMap<>(); + public Scenario replaceForSender(final Sender sender, final long... nonce) { + internalReplaceForSender(sender, nonce); + actions.add(notificationsChecker::assertExpectedNotifications); + return this; + } + + private Scenario internalReplaceForSender(final Sender sender, final long... nonce) { + actions.add( + () -> { + Arrays.stream(nonce) + .forEach( + n -> { + final var maybeExistingTx = getMaybe(sender, n); + maybeExistingTx.ifPresentOrElse( + existingTx -> { + final var pendingTx = replace(sender, existingTx); + final Account mockSender = mock(Account.class); + when(mockSender.getNonce()).thenReturn(nonceBySender.get(sender)); + pending.addTransaction(pendingTx, Optional.of(mockSender)); + notificationsChecker.addExpectedAddNotification(pendingTx); + notificationsChecker.addExpectedDropNotification(existingTx); + }, + () -> + fail( + "Could not replace non-existing transaction with nonce " + + n + + " for sender " + + sender.name())); + }); + }); + return this; + } + + public Scenario replaceForSenders(final Object... args) { for (int i = 0; i < args.length; i = i + 2) { final Sender sender = (Sender) args[i]; final long nonce = (int) args[i + 1]; - maxConfirmedNonceBySender.put(sender.address, nonce); - setAccountNonce(sender, nonce + 1); + internalReplaceForSender(sender, nonce); } + actions.add(notificationsChecker::assertExpectedNotifications); + return this; + } + + public Scenario confirmedForSenders(final Object... args) { actions.add( - (pending, prio, ready, sparse, dropped) -> - prio.blockAdded(FeeMarket.london(0L), mockBlockHeader(), maxConfirmedNonceBySender)); + () -> { + final Map maxConfirmedNonceBySender = new HashMap<>(); + for (int i = 0; i < args.length; i = i + 2) { + final Sender sender = (Sender) args[i]; + final long nonce = (int) args[i + 1]; + maxConfirmedNonceBySender.put(sender.address, nonce); + nonceBySender.put(sender, nonce + 1); + for (final var pendingTx : getAll(sender)) { + if (pendingTx.getNonce() <= nonce) { + notificationsChecker.addExpectedDropNotification( + liveTxsBySender.get(sender).remove(pendingTx.getNonce())); + } + } + } + + prio.blockAdded(FeeMarket.london(0L), mockBlockHeader(), maxConfirmedNonceBySender); + notificationsChecker.assertExpectedNotifications(); + }); return this; } - Scenario setAccountNonce(final Sender sender, final long nonce) { - actions.add((pending, prio, ready, sparse, dropped) -> nonceBySender.put(sender, nonce)); + public Scenario setAccountNonce(final Sender sender, final long nonce) { + actions.add(() -> nonceBySender.put(sender, nonce)); return this; } - void execute( - final LayeredPendingTransactions pending, - final AbstractPrioritizedTransactions prioritized, - final ReadyTransactions ready, - final SparseTransactions sparse, - final EvictCollectorLayer dropped) { - actions.forEach(action -> action.accept(pending, prioritized, ready, sparse, dropped)); - assertExpectedPrioritized(prioritized, lastExpectedPrioritized); - assertExpectedReady(ready, lastExpectedReady); - assertExpectedSparse(sparse, lastExpectedSparse); - assertExpectedDropped(dropped, lastExpectedDropped); + public Scenario reorgForSenders(final Object... args) { + actions.add( + () -> { + for (int i = 0; i < args.length; i = i + 2) { + final Sender sender = (Sender) args[i]; + final long nonce = (int) args[i + 1]; + nonceBySender.put(sender, nonce); + sendersWithReorg.add(sender); + } + }); + return this; } - private PendingTransaction getOrCreate( + private PendingTransaction create( final Sender sender, final TransactionType type, final long nonce) { - return txsBySender - .get(sender) - .computeIfAbsent( - nonce, - n -> - switch (type) { - case FRONTIER -> createFrontierPendingTransaction(sender, n); - case ACCESS_LIST -> createAccessListPendingTransaction(sender, n); - case EIP1559 -> createEIP1559PendingTransaction(sender, n); - case BLOB -> createBlobPendingTransaction(sender, n); - case SET_CODE -> throw new UnsupportedOperationException(); - }); + if (liveTxsBySender.get(sender).containsKey(nonce)) { + fail( + "Transaction for sender " + sender.name() + " with nonce " + nonce + " already exists"); + } + final var newPendingTx = + switch (type) { + case FRONTIER -> createFrontierPendingTransaction(sender, nonce); + case ACCESS_LIST -> createAccessListPendingTransaction(sender, nonce); + case EIP1559 -> createEIP1559PendingTransaction(sender, nonce); + case BLOB -> createBlobPendingTransaction(sender, nonce); + case SET_CODE -> throw new UnsupportedOperationException(); + }; + liveTxsBySender.get(sender).put(nonce, newPendingTx); + return newPendingTx; + } + + private PendingTransaction replace(final Sender sender, final PendingTransaction pendingTx) { + final var replaceTx = + createRemotePendingTransaction( + createTransactionReplacement(pendingTx.getTransaction(), sender.key), + sender.hasPriority); + liveTxsBySender.get(sender).replace(pendingTx.getNonce(), replaceTx); + return replaceTx; + } + + private Optional getMaybe(final Sender sender, final long nonce) { + return Optional.ofNullable(liveTxsBySender.get(sender).get(nonce)); } private PendingTransaction get(final Sender sender, final long nonce) { - return txsBySender.get(sender).get(nonce); + return getMaybe(sender, nonce).get(); + } + + private List getAll(final Sender sender) { + return List.copyOf(liveTxsBySender.get(sender).values()); } private PendingTransaction createFrontierPendingTransaction( @@ -1489,102 +1637,114 @@ private PendingTransaction createBlobPendingTransaction(final Sender sender, fin } public Scenario expectedPrioritizedForSender(final Sender sender, final long... nonce) { - lastExpectedPrioritized = expectedForSender(sender, nonce); - final var expectedCopy = List.copyOf(lastExpectedPrioritized); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedPrioritized(prio, expectedCopy)); + () -> { + lastExpectedPrioritized = expectedForSender(sender, nonce); + assertExpectedPrioritized(prio, lastExpectedPrioritized); + }); return this; } public Scenario expectedReadyForSender(final Sender sender, final long... nonce) { - lastExpectedReady = expectedForSender(sender, nonce); - final var expectedCopy = List.copyOf(lastExpectedReady); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedReady(ready, expectedCopy)); + () -> { + lastExpectedReady = expectedForSender(sender, nonce); + assertExpectedReady(ready, lastExpectedReady); + }); return this; } public Scenario expectedSparseForSender(final Sender sender, final long... nonce) { - lastExpectedSparse = expectedForSender(sender, nonce); - final var expectedCopy = List.copyOf(lastExpectedSparse); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedSparse(sparse, expectedCopy)); + () -> { + lastExpectedSparse = expectedForSender(sender, nonce); + assertExpectedSparse(sparse, lastExpectedSparse); + }); return this; } public Scenario expectedDroppedForSender(final Sender sender, final long... nonce) { - lastExpectedDropped = expectedForSender(sender, nonce); - final var expectedCopy = List.copyOf(lastExpectedDropped); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedDropped(dropped, expectedCopy)); + () -> { + lastExpectedDropped = droppedForSender(sender, nonce); + assertExpectedDropped(dropped, lastExpectedDropped); + }); return this; } public Scenario expectedPrioritizedForSenders( - final Sender sender1, final long nonce1, final Sender sender2, Object... args) { - lastExpectedPrioritized = expectedForSenders(sender1, nonce1, sender2, args); - final var expectedCopy = List.copyOf(lastExpectedPrioritized); + final Sender sender1, final long nonce1, final Sender sender2, final Object... args) { actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedPrioritized(prio, expectedCopy)); + () -> { + lastExpectedPrioritized = expectedForSenders(sender1, nonce1, sender2, args); + assertExpectedPrioritized(prio, lastExpectedPrioritized); + }); return this; } public Scenario expectedPrioritizedForSenders() { - lastExpectedPrioritized = List.of(); - final var expectedCopy = List.copyOf(lastExpectedPrioritized); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedPrioritized(prio, expectedCopy)); + () -> { + lastExpectedPrioritized = List.of(); + assertExpectedPrioritized(prio, lastExpectedPrioritized); + }); return this; } public Scenario expectedReadyForSenders( final Sender sender1, final long nonce1, final Sender sender2, final Object... args) { - lastExpectedReady = expectedForSenders(sender1, nonce1, sender2, args); - final var expectedCopy = List.copyOf(lastExpectedReady); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedReady(ready, expectedCopy)); + () -> { + lastExpectedReady = expectedForSenders(sender1, nonce1, sender2, args); + assertExpectedReady(ready, lastExpectedReady); + }); return this; } public Scenario expectedReadyForSenders() { - lastExpectedReady = List.of(); - final var expectedCopy = List.copyOf(lastExpectedReady); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedReady(ready, expectedCopy)); + () -> { + lastExpectedReady = List.of(); + assertExpectedReady(ready, lastExpectedReady); + }); return this; } public Scenario expectedSparseForSenders( final Sender sender1, final long nonce1, final Sender sender2, final Object... args) { - lastExpectedSparse = expectedForSenders(sender1, nonce1, sender2, args); - final var expectedCopy = List.copyOf(lastExpectedSparse); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedSparse(sparse, expectedCopy)); + () -> { + lastExpectedSparse = expectedForSenders(sender1, nonce1, sender2, args); + assertExpectedSparse(sparse, lastExpectedSparse); + }); return this; } public Scenario expectedSparseForSenders() { - lastExpectedSparse = List.of(); - final var expectedCopy = List.copyOf(lastExpectedSparse); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedSparse(sparse, expectedCopy)); + () -> { + lastExpectedSparse = List.of(); + assertExpectedSparse(sparse, lastExpectedSparse); + }); return this; } public Scenario expectedDroppedForSenders( final Sender sender1, final long nonce1, final Sender sender2, final Object... args) { - lastExpectedDropped = expectedForSenders(sender1, nonce1, sender2, args); - final var expectedCopy = List.copyOf(lastExpectedDropped); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedDropped(dropped, expectedCopy)); + () -> { + lastExpectedDropped = expectedForSenders(sender1, nonce1, sender2, args); + assertExpectedDropped(dropped, lastExpectedDropped); + }); return this; } public Scenario expectedDroppedForSenders() { - lastExpectedDropped = List.of(); - final var expectedCopy = List.copyOf(lastExpectedDropped); actions.add( - (pending, prio, ready, sparse, dropped) -> assertExpectedDropped(dropped, expectedCopy)); + () -> { + lastExpectedDropped = List.of(); + assertExpectedDropped(dropped, lastExpectedDropped); + }); return this; } @@ -1639,59 +1799,74 @@ private List expectedForSender(final Sender sender, final lo return Arrays.stream(nonce).mapToObj(n -> get(sender, n)).toList(); } + private List droppedForSender(final Sender sender, final long... nonce) { + return Arrays.stream(nonce).mapToObj(n -> droppedTxsBySender.get(sender).get(n)).toList(); + } + public Scenario expectedNextNonceForSenders(final Object... args) { for (int i = 0; i < args.length; i = i + 2) { final Sender sender = (Sender) args[i]; final Integer nullableInt = (Integer) args[i + 1]; final OptionalLong nonce = nullableInt == null ? OptionalLong.empty() : OptionalLong.of(nullableInt); - actions.add( - (pending, prio, ready, sparse, dropped) -> - assertThat(prio.getNextNonceFor(sender.address)).isEqualTo(nonce)); + actions.add(() -> assertThat(prio.getNextNonceFor(sender.address)).isEqualTo(nonce)); } return this; } public Scenario removeForSender(final Sender sender, final long... nonce) { - Arrays.stream(nonce) - .forEach( - n -> { - final var pendingTx = getOrCreate(sender, EIP1559, n); - actions.add( - (pending, prio, ready, sparse, dropped) -> prio.remove(pendingTx, INVALIDATED)); - }); + actions.add( + () -> { + Arrays.stream(nonce) + .forEach( + n -> { + final var maybeLiveTx = getMaybe(sender, n); + final var pendingTx = maybeLiveTx.orElseGet(() -> create(sender, EIP1559, n)); + prio.remove(pendingTx, INVALIDATED); + maybeLiveTx.ifPresent( + liveTx -> { + notificationsChecker.addExpectedDropNotification(liveTx); + liveTxsBySender.get(sender).remove(liveTx.getNonce()); + droppedTxsBySender.get(sender).put(liveTx.getNonce(), liveTx); + }); + }); + handleDropped(); + notificationsChecker.assertExpectedNotifications(); + }); return this; } public Scenario penalizeForSender(final Sender sender, final long... nonce) { - Arrays.stream(nonce) - .forEach( - n -> { - actions.add( - (pending, prio, ready, sparse, dropped) -> { - final var senderTxs = prio.getAllFor(sender.address); - Arrays.stream(nonce) - .mapToObj( - n2 -> senderTxs.stream().filter(pt -> pt.getNonce() == n2).findAny()) - .map(Optional::get) - .forEach(prio::penalize); - }); - }); + actions.add( + () -> + Arrays.stream(nonce) + .forEach( + n -> { + final var senderTxs = prio.getAllFor(sender.address); + Arrays.stream(nonce) + .mapToObj( + n2 -> + senderTxs.stream().filter(pt -> pt.getNonce() == n2).findAny()) + .map(Optional::get) + .forEach(prio::penalize); + })); return this; } public Scenario expectedSelectedTransactions(final Object... args) { - List expectedSelected = new ArrayList<>(); - for (int i = 0; i < args.length; i = i + 2) { - final Sender sender = (Sender) args[i]; - final long nonce = (int) args[i + 1]; - expectedSelected.add(get(sender, nonce)); - } actions.add( - (pending, prio, ready, sparse, dropped) -> - assertThat(prio.getBySender()) - .flatExtracting(SenderPendingTransactions::pendingTransactions) - .containsExactlyElementsOf(expectedSelected)); + () -> { + List expectedSelected = new ArrayList<>(); + for (int i = 0; i < args.length; i = i + 2) { + final Sender sender = (Sender) args[i]; + final long nonce = (int) args[i + 1]; + expectedSelected.add(get(sender, nonce)); + } + + assertThat(prio.getBySender()) + .flatExtracting(SenderPendingTransactions::pendingTransactions) + .containsExactlyElementsOf(expectedSelected); + }); return this; } @@ -1721,6 +1896,62 @@ enum Sender { this.gasFeeMultiplier = gasFeeMultiplier; this.hasPriority = hasPriority; } + + static Sender getByAddress(final Address address) { + return Arrays.stream(values()).filter(s -> s.address.equals(address)).findAny().get(); + } + } + + static class NotificationsChecker { + private final List collectedAddNotifications = + Collections.synchronizedList(new ArrayList<>()); + private final List collectedDropNotifications = + Collections.synchronizedList(new ArrayList<>()); + private final List expectedAddNotifications = new ArrayList<>(); + private final List expectedDropNotifications = new ArrayList<>(); + + void collectAddNotification(final Transaction tx) { + collectedAddNotifications.add(tx); + } + + void collectDropNotification(final Transaction tx) { + collectedDropNotifications.add(tx); + } + + void addExpectedAddNotification(final PendingTransaction tx) { + expectedAddNotifications.add(tx.getTransaction()); + } + + void addExpectedDropNotification(final PendingTransaction tx) { + expectedDropNotifications.add(tx.getTransaction()); + } + + void assertExpectedNotifications() { + assertAddNotifications(expectedAddNotifications); + assertDropNotifications(expectedDropNotifications); + } + + private void assertAddNotifications(final List expectedAddedTxs) { + await() + .untilAsserted( + () -> + assertThat(collectedAddNotifications) + .describedAs("Added notifications") + .containsExactlyInAnyOrderElementsOf(expectedAddedTxs)); + collectedAddNotifications.clear(); + expectedAddNotifications.clear(); + } + + private void assertDropNotifications(final List expectedDroppedTxs) { + await() + .untilAsserted( + () -> + assertThat(collectedDropNotifications) + .describedAs("Dropped notifications") + .containsExactlyInAnyOrderElementsOf(expectedDroppedTxs)); + collectedDropNotifications.clear(); + expectedDropNotifications.clear(); + } } @Test diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java index 14ed39c2e59..18ce6f49ed0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java @@ -16,7 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; -import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED; +import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when;