Skip to content

Commit

Permalink
Merge pull request #1931 from ergoplatform/mp-fix
Browse files Browse the repository at this point in the history
Mempool code simplification, removing applied blocks transactions from the mempool
  • Loading branch information
kushti authored Jan 13, 2023
2 parents 3d0dcfe + 4b6c4f2 commit 5d612b0
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object ErgoMemPoolBenchmark

private def bench(txsInIncomeOrder: Seq[ErgoTransaction]): Unit = {
var pool = ErgoMemPool.empty(settings)
txsInIncomeOrder.foreach(tx => pool = pool.put(UnconfirmedTransaction(tx, None)).get)
txsInIncomeOrder.foreach(tx => pool = pool.put(UnconfirmedTransaction(tx, None)))
}

performance of "ErgoMemPool awaiting" in {
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ ergo {
mempoolCapacity = 1000

# Interval for mempool transaction re-check. We check transaction when it is entering the mempool, and then
# re-check it every interval valie
mempoolCleanupDuration = 30m
# re-check it every interval value
mempoolCleanupDuration = 20m

# Mempool transaction sorting scheme ("random", "bySize", or "byExecutionCost")
mempoolSorting = "random"
Expand Down
11 changes: 1 addition & 10 deletions src/main/scala/org/ergoplatform/local/CleanupWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class CleanupWorker(nodeViewHolderRef: ActorRef,
log.debug(s"${validated.size} re-checked mempool transactions were ok, " +
s"${toEliminate.size} transactions were invalidated")

if(validated.nonEmpty) {
if (validated.nonEmpty) {
nodeViewHolderRef ! RecheckedTransactions(validated)
}
if (toEliminate.nonEmpty) {
Expand Down Expand Up @@ -107,15 +107,6 @@ class CleanupWorker(nodeViewHolderRef: ActorRef,

object CleanupWorker {

/**
* Constant which shows on how many cleanup operations (called when a new block arrives) a transaction
* re-check happens.
*
* If transactions set is large and stable, then about (1/RevisionInterval)-th of the pool is checked
*
*/
val RevisionInterval: Int = 4

/**
*
* A command to run (partial) memory pool cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
.flatMap(extractTransactions)
.filter(tx => !appliedTxs.exists(_.id == tx.id))
.map(tx => UnconfirmedTransaction(tx, None))
memPool.putWithoutCheck(rolledBackTxs)
memPool.remove(appliedTxs).put(rolledBackTxs)
}

/**
Expand Down Expand Up @@ -622,9 +622,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
case LocallyGeneratedTransaction(unconfirmedTx) =>
sender() ! txModify(unconfirmedTx)
case RecheckedTransactions(unconfirmedTxs) =>
val updatedPool = unconfirmedTxs.foldRight(memoryPool()) { case (utx, mp) =>
mp.remove(utx).putWithoutCheck(utx)
}
val updatedPool = memoryPool().put(unconfirmedTxs)
updateNodeView(updatedMempool = Some(updatedPool))
case EliminateTransactions(ids) =>
val updatedPool = ids.foldLeft(memoryPool()) { case (pool, txId) => pool.invalidate(txId) }
Expand Down
40 changes: 21 additions & 19 deletions src/main/scala/org/ergoplatform/nodeView/mempool/ErgoMemPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,20 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
* @param unconfirmedTx
* @return Success(updatedPool), if transaction successfully added to the pool, Failure(_) otherwise
*/
def put(unconfirmedTx: UnconfirmedTransaction): Try[ErgoMemPool] = put(Seq(unconfirmedTx))

def put(unconfirmedTxs: Iterable[UnconfirmedTransaction]): Try[ErgoMemPool] = Try {
putWithoutCheck(unconfirmedTxs.filterNot(unconfirmedTx => pool.contains(unconfirmedTx.transaction.id)))
}

def putWithoutCheck(tx: UnconfirmedTransaction): ErgoMemPool = {
val updatedPool = pool.put(tx, feeFactor(tx))
new ErgoMemPool(updatedPool, stats, sortingOption)
def put(unconfirmedTx: UnconfirmedTransaction): ErgoMemPool = {
if (!pool.contains(unconfirmedTx.id)) {
val updatedPool = pool.put(unconfirmedTx, feeFactor(unconfirmedTx))
new ErgoMemPool(updatedPool, stats, sortingOption)
} else {
this
}
}

def putWithoutCheck(txs: Iterable[UnconfirmedTransaction]): ErgoMemPool = {
val updatedPool = txs.toSeq.distinct.foldLeft(pool) { case (acc, tx) => acc.put(tx, feeFactor(tx)) }
new ErgoMemPool(updatedPool, stats, sortingOption)
def put(txs: TraversableOnce[UnconfirmedTransaction]): ErgoMemPool = {
txs.foldLeft(this) { case (acc, tx) => acc.put(tx) }
}

private def updateStatsOnRemoval(unconfirmedTransaction: UnconfirmedTransaction): MemPoolStatistics = {
val tx = unconfirmedTransaction.transaction
private def updateStatsOnRemoval(tx: ErgoTransaction): MemPoolStatistics = {
val wtx = pool.transactionsRegistry.get(tx.id)
wtx.map(wgtx => stats.add(System.currentTimeMillis(), wgtx))
.getOrElse(MemPoolStatistics(System.currentTimeMillis(), 0, System.currentTimeMillis()))
Expand All @@ -118,9 +114,13 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
/**
* Remove transaction from the pool
*/
def remove(unconfirmedTx: UnconfirmedTransaction): ErgoMemPool = {
log.debug(s"Removing transaction ${unconfirmedTx.id} from the mempool")
new ErgoMemPool(pool.remove(unconfirmedTx), updateStatsOnRemoval(unconfirmedTx), sortingOption)
def remove(tx: ErgoTransaction): ErgoMemPool = {
log.debug(s"Removing transaction ${tx.id} from the mempool")
new ErgoMemPool(pool.remove(tx), updateStatsOnRemoval(tx), sortingOption)
}

def remove(txs: TraversableOnce[ErgoTransaction]): ErgoMemPool = {
txs.foldLeft(this) { case (acc, tx) => acc.remove(tx) }
}

/**
Expand All @@ -130,13 +130,15 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
*/
def invalidate(unconfirmedTx: UnconfirmedTransaction): ErgoMemPool = {
log.debug(s"Invalidating mempool transaction ${unconfirmedTx.id}")
new ErgoMemPool(pool.invalidate(unconfirmedTx), updateStatsOnRemoval(unconfirmedTx), sortingOption)
new ErgoMemPool(pool.invalidate(unconfirmedTx), updateStatsOnRemoval(unconfirmedTx.transaction), sortingOption)
}

def invalidate(unconfirmedTransactionId: ModifierId): ErgoMemPool = {
pool.get(unconfirmedTransactionId) match {
case Some(utx) => invalidate(utx)
case None => this
case None =>
log.warn(s"Can't invalidate transaction $unconfirmedTransactionId as it is not in the pool")
this
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ case class OrderedTxPool(orderedTransactions: TreeMap[WeightedTxId, UnconfirmedT
def size: Int = orderedTransactions.size

def get(id: ModifierId): Option[UnconfirmedTransaction] = {
transactionsRegistry.get(id).flatMap(orderedTransactions.get(_))
transactionsRegistry.get(id).flatMap { wtx =>
orderedTransactions.get(wtx) match {
case s@Some(_) => s
case None => log.warn(s"Found $id in registry but not ordered transactions"); None
}
}
}


Expand All @@ -68,7 +73,7 @@ case class OrderedTxPool(orderedTransactions: TreeMap[WeightedTxId, UnconfirmedT
invalidatedTxIds,
outputs ++ tx.outputs.map(_.id -> wtx),
inputs ++ tx.inputs.map(_.boxId -> wtx)
).updateFamily(unconfirmedTx, wtx.weight, System.currentTimeMillis(), 0)
).updateFamily(tx, wtx.weight, System.currentTimeMillis(), 0)
if (newPool.orderedTransactions.size > mempoolCapacity) {
val victim = newPool.orderedTransactions.last._2
newPool.remove(victim)
Expand All @@ -84,10 +89,9 @@ case class OrderedTxPool(orderedTransactions: TreeMap[WeightedTxId, UnconfirmedT
/**
* Removes transaction from the pool
*
* @param unconfirmedTx - Transaction to remove
* @param tx - Transaction to remove
*/
def remove(unconfirmedTx: UnconfirmedTransaction): OrderedTxPool = {
val tx = unconfirmedTx.transaction
def remove(tx: ErgoTransaction): OrderedTxPool = {
transactionsRegistry.get(tx.id) match {
case Some(wtx) =>
OrderedTxPool(
Expand All @@ -96,11 +100,13 @@ case class OrderedTxPool(orderedTransactions: TreeMap[WeightedTxId, UnconfirmedT
invalidatedTxIds,
outputs -- tx.outputs.map(_.id),
inputs -- tx.inputs.map(_.boxId)
).updateFamily(unconfirmedTx, -wtx.weight, System.currentTimeMillis(), depth = 0)
).updateFamily(tx, -wtx.weight, System.currentTimeMillis(), depth = 0)
case None => this
}
}

def remove(utx: UnconfirmedTransaction): OrderedTxPool = remove(utx.transaction)

def invalidate(unconfirmedTx: UnconfirmedTransaction): OrderedTxPool = {
val tx = unconfirmedTx.transaction
transactionsRegistry.get(tx.id) match {
Expand All @@ -111,7 +117,7 @@ case class OrderedTxPool(orderedTransactions: TreeMap[WeightedTxId, UnconfirmedT
invalidatedTxIds.put(tx.id),
outputs -- tx.outputs.map(_.id),
inputs -- tx.inputs.map(_.boxId)
).updateFamily(unconfirmedTx, -wtx.weight, System.currentTimeMillis(), depth = 0)
).updateFamily(tx, -wtx.weight, System.currentTimeMillis(), depth = 0)
case None =>
OrderedTxPool(orderedTransactions, transactionsRegistry, invalidatedTxIds.put(tx.id), outputs, inputs)
}
Expand All @@ -132,8 +138,7 @@ case class OrderedTxPool(orderedTransactions: TreeMap[WeightedTxId, UnconfirmedT
*
*/
def canAccept(unconfirmedTx: UnconfirmedTransaction): Boolean = {
val tx = unconfirmedTx.transaction
!contains(tx.id) && size <= mempoolCapacity
!contains(unconfirmedTx.id) && size <= mempoolCapacity
}

/**
Expand All @@ -155,21 +160,20 @@ case class OrderedTxPool(orderedTransactions: TreeMap[WeightedTxId, UnconfirmedT
* To achieve this goal we recursively add weight of new transaction to all transactions which
* outputs it directly or indirectly spending.
*
* @param unconfirmedTx
* @param tx
* @param weight
* @return
*/
private def updateFamily(unconfirmedTx: UnconfirmedTransaction,
private def updateFamily(tx: ErgoTransaction,
weight: Long,
startTime: Long,
depth: Int): OrderedTxPool = {
val now = System.currentTimeMillis()
val timeDiff = now - startTime
if (depth > MaxParentScanDepth || timeDiff > MaxParentScanTime) {
log.warn(s"updateFamily takes too long, depth: $depth, time diff: $timeDiff, transaction: ${unconfirmedTx.id}")
log.warn(s"updateFamily takes too long, depth: $depth, time diff: $timeDiff, transaction: ${tx.id}")
this
} else {
val tx = unconfirmedTx.transaction

val uniqueTxIds: Set[WeightedTxId] = tx.inputs.flatMap(input => this.outputs.get(input.boxId))(collection.breakOut)
val parentTxs = uniqueTxIds.flatMap(wtx => this.orderedTransactions.get(wtx).map(ut => wtx -> ut))
Expand All @@ -183,7 +187,7 @@ case class OrderedTxPool(orderedTransactions: TreeMap[WeightedTxId, UnconfirmedT
parent.outputs.foldLeft(pool.outputs)((newOutputs, box) => newOutputs.updated(box.id, newWtx)),
parent.inputs.foldLeft(pool.inputs)((newInputs, inp) => newInputs.updated(inp.boxId, newWtx))
)
newPool.updateFamily(ut, weight, startTime, depth + 1)
newPool.updateFamily(parent, weight, startTime, depth + 1)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class TransactionApiRouteSpec extends AnyFlatSpec

val chainedRoute: Route = {
//constructing memory pool and node view with the transaction tx included
val mp2 = memPool.put(UnconfirmedTransaction(tx, None)).get
val mp2 = memPool.put(UnconfirmedTransaction(tx, None))
class UtxoReadersStub2 extends Actor {
def receive: PartialFunction[Any, Unit] = {
case GetReaders => sender() ! Readers(history, utxoState, mp2, wallet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec
val txs = validTransactionsFromUtxoState(wus)
var pool = ErgoMemPool.empty(settings)
txs.foreach { tx =>
pool = pool.putWithoutCheck(Seq(UnconfirmedTransaction(tx, None)))
pool = pool.put(UnconfirmedTransaction(tx, None))
}
txs.foreach { tx =>
pool.process(UnconfirmedTransaction(tx, None), us)._2.isInstanceOf[ProcessingOutcome.Declined] shouldBe true
Expand Down Expand Up @@ -189,7 +189,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec
it should "accept only unique transactions" in {
val pool = ErgoMemPool.empty(settings)
val tx = UnconfirmedTransaction(invalidErgoTransactionGen.sample.get, None)
pool.putWithoutCheck(Seq(tx, tx, tx)).size shouldBe 1
pool.put(Seq(tx, tx, tx)).size shouldBe 1
}

it should "drop less prioritized transaction in case of pool overflow" in {
Expand All @@ -204,11 +204,11 @@ class ErgoMemPoolSpec extends AnyFlatSpec
}
val lessPrioritizedTxs = txsWithAscendingPriority.init.map(tx => UnconfirmedTransaction(tx, None))
val mostPrioritizedTx = UnconfirmedTransaction(txsWithAscendingPriority.last, None)
pool = pool.putWithoutCheck(lessPrioritizedTxs)
pool = pool.put(lessPrioritizedTxs)

pool.size shouldBe 4
pool.getAll should contain only (lessPrioritizedTxs: _*)
pool = pool.putWithoutCheck(Seq(mostPrioritizedTx))
pool = pool.put(Seq(mostPrioritizedTx))
pool.size shouldBe 4
pool.getAll should contain only (mostPrioritizedTx +: lessPrioritizedTxs.tail: _*)
}
Expand All @@ -220,7 +220,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec
val txs = validTransactionsFromUtxoState(wus).map(tx => UnconfirmedTransaction(tx, None))
var pool = ErgoMemPool.empty(settings)
txs.foreach { tx =>
pool = pool.putWithoutCheck(Seq(tx))
pool = pool.put(tx)
}
txs.foreach { tx =>
val spendingBox = tx.transaction.outputs.head
Expand All @@ -242,7 +242,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec
val limitedPoolSettings = settings.copy(nodeSettings = settings.nodeSettings.copy(mempoolCapacity = (family_depth + 1) * txs.size))
var pool = ErgoMemPool.empty(limitedPoolSettings)
txs.foreach { tx =>
pool = pool.putWithoutCheck(Seq(tx))
pool = pool.put(tx)
}
for (_ <- 1 to family_depth) {
txs = txs.map(tx => {
Expand Down Expand Up @@ -278,7 +278,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec
val limitedPoolSettings = settings.copy(nodeSettings = settings.nodeSettings.copy(mempoolCapacity = (family_depth + 1) * txs.size))
var pool = ErgoMemPool.empty(limitedPoolSettings)
txs.foreach { tx =>
pool = pool.putWithoutCheck(Seq(tx))
pool = pool.put(tx)
}
for (_ <- 1 to family_depth) {
txs = txs.map(tx => {
Expand All @@ -294,7 +294,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec
}
pool.size shouldBe (family_depth + 1) * txs.size
allTxs.foreach { tx =>
pool = pool.remove(tx)
pool = pool.remove(tx.transaction)
}
pool.size shouldBe 0
}
Expand All @@ -310,7 +310,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec
val limitedPoolSettings = settings.copy(nodeSettings = settings.nodeSettings.copy(mempoolCapacity = (family_depth + 1) * txs.size))
var pool = ErgoMemPool.empty(limitedPoolSettings)
txs.foreach { tx =>
pool = pool.putWithoutCheck(Seq(tx))
pool = pool.put(tx)
}
for (_ <- 1 to family_depth) {
txs = txs.map(tx => {
Expand Down Expand Up @@ -353,7 +353,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec
val limitedPoolSettings = settings.copy(nodeSettings = settings.nodeSettings.copy(mempoolCapacity = (family_depth + 1) * txs.size))
var pool = ErgoMemPool.empty(limitedPoolSettings)
txs.foreach { tx =>
pool = pool.putWithoutCheck(Seq(tx))
pool = pool.put(tx)
}
for (_ <- 1 to family_depth) {
txs = txs.map(tx => {
Expand All @@ -373,7 +373,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec
pool.stats.snapTakenTxns shouldBe MemPoolStatistics(System.currentTimeMillis(),0,System.currentTimeMillis()).snapTakenTxns

allTxs.foreach { tx =>
pool = pool.remove(tx)
pool = pool.remove(tx.transaction)
}
pool.size shouldBe 0
pool.stats.takenTxns shouldBe (family_depth + 1) * txs.size
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/org/ergoplatform/utils/Stubs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ trait Stubs extends ErgoGenerators with ErgoTestHelpers with ChainGenerator with
lazy val wallet = new WalletStub

val txs: Seq[ErgoTransaction] = validTransactionsFromBoxHolder(boxesHolderGen.sample.get)._1
val memPool: ErgoMemPool = ErgoMemPool.empty(settings).put(txs.map(tx => UnconfirmedTransaction(tx, None))).get
val memPool: ErgoMemPool = ErgoMemPool.empty(settings).put(txs.map(tx => UnconfirmedTransaction(tx, None)))

val digestReaders = Readers(history, digestState, memPool, wallet)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ trait MempoolFilterPerformanceTest
var m: ErgoMemPool = memPool
(0 until 1000) foreach { _ =>
forAll(transactionGenerator) { tx: ErgoTransaction =>
m = m.put(UnconfirmedTransaction(tx, None)).get
m = m.put(UnconfirmedTransaction(tx, None))
}
}
m.size should be > 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait MempoolRemovalTest extends AnyPropSpec
var m: ErgoMemPool = memPool
// var h: ErgoHistory = historyGen.sample.get
forAll(transactionGenerator) { tx: ErgoTransaction =>
m = m.put(UnconfirmedTransaction(tx, None)).get
m = m.put(UnconfirmedTransaction(tx, None))
}
// var prevMempoolSize = m.size
// val b = modifierWithTransactions(Some(m), None)
Expand Down
Loading

0 comments on commit 5d612b0

Please sign in to comment.