From 8f866093ce353241a7e4b0647060f2f6537a7d6c Mon Sep 17 00:00:00 2001 From: sword_smith Date: Fri, 8 Nov 2024 16:36:58 +0100 Subject: [PATCH] mempool: Track which transaction are own This is done to determine which transactions that should be updated when a new block arrives. Update is an expensive STARK proof to produce, so we opt for the following policy: - own transactions are updated, if possible. - If client is composing, all transactions are updated, as we otherwise cannot include the mempool transactions in the new block. Also note that, if client is composing, it *has* to wait for the update to finish before starting a new composition. Otherwise, the transactions will never be mined. --- src/bin/dashboard_src/overview_screen.rs | 8 +- src/job_queue/queue.rs | 8 +- src/main_loop.rs | 22 ++- src/main_loop/proof_upgrader.rs | 31 ++- src/mine_loop.rs | 7 +- src/models/blockchain/block/mod.rs | 2 + src/models/state/mempool.rs | 236 +++++++++++++++++------ src/models/state/mod.rs | 10 +- src/models/state/wallet/wallet_state.rs | 3 +- src/peer_loop.rs | 5 +- src/rpc_server.rs | 9 +- 11 files changed, 244 insertions(+), 97 deletions(-) diff --git a/src/bin/dashboard_src/overview_screen.rs b/src/bin/dashboard_src/overview_screen.rs index a8fcb6029..ca96eff8a 100644 --- a/src/bin/dashboard_src/overview_screen.rs +++ b/src/bin/dashboard_src/overview_screen.rs @@ -51,7 +51,8 @@ pub struct OverviewData { archive_coverage: Option, mempool_size: Option, - mempool_tx_count: Option, + mempool_total_tx_count: Option, + mempool_own_tx_count: Option, listen_address: Option, peer_count: Option, @@ -144,7 +145,8 @@ impl OverviewScreen { own_overview_data.tip_digest = Some(resp.tip_digest); own_overview_data.block_header = Some(resp.tip_header); own_overview_data.mempool_size = Some(ByteSize::b(resp.mempool_size.try_into().unwrap())); - own_overview_data.mempool_tx_count = Some(resp.mempool_tx_count.try_into().unwrap()); + own_overview_data.mempool_total_tx_count = Some(resp.mempool_total_tx_count.try_into().unwrap()); + own_overview_data.mempool_own_tx_count = Some(resp.mempool_own_tx_count.try_into().unwrap()); own_overview_data.peer_count=resp.peer_count; own_overview_data.authenticated_peer_count=Some(0); own_overview_data.syncing=resp.syncing; @@ -398,7 +400,7 @@ impl Widget for OverviewScreen { lines.push(format!("size: {}", dashifnotset!(data.mempool_size))); lines.push(format!( "tx count: {}", - dashifnotset!(data.mempool_tx_count) + dashifnotset!(data.mempool_total_tx_count) )); Self::report(&lines, "Mempool") .style(style) diff --git a/src/job_queue/queue.rs b/src/job_queue/queue.rs index 5cd9d4161..72a7a83a2 100644 --- a/src/job_queue/queue.rs +++ b/src/job_queue/queue.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::Mutex; + use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio_util::task::TaskTracker; @@ -159,10 +160,12 @@ impl JobQueue

{ #[cfg(test)] mod tests { - use super::*; use std::time::Instant; + use tracing_test::traced_test; + use super::*; + #[tokio::test(flavor = "multi_thread")] #[traced_test] async fn run_sync_jobs_by_priority() -> anyhow::Result<()> { @@ -223,9 +226,10 @@ mod tests { } mod workers { - use super::*; use std::any::Any; + use super::*; + #[derive(PartialEq, Eq, PartialOrd, Ord)] pub enum DoubleJobPriority { Low = 1, diff --git a/src/main_loop.rs b/src/main_loop.rs index 576e34662..19cdc8c4f 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -29,7 +29,6 @@ use tracing::warn; use crate::connect_to_peers::answer_peer_wrapper; use crate::connect_to_peers::call_peer_wrapper; -use crate::job_queue::triton_vm::TritonVmJobPriority; use crate::models::blockchain::block::block_header::BlockHeader; use crate::models::blockchain::block::block_height::BlockHeight; use crate::models::blockchain::block::difficulty_control::ProofOfWork; @@ -45,6 +44,7 @@ use crate::models::peer::HandshakeData; use crate::models::peer::PeerInfo; use crate::models::peer::PeerSynchronizationState; use crate::models::state::block_proposal::BlockProposal; +use crate::models::state::mempool::TransactionOrigin; use crate::models::state::tx_proving_capability::TxProvingCapability; use crate::models::state::GlobalState; use crate::models::state::GlobalStateLock; @@ -620,7 +620,10 @@ impl MainLoopHandler { // Insert into mempool global_state_mut - .mempool_insert(pt2m_transaction.transaction.to_owned()) + .mempool_insert( + pt2m_transaction.transaction.to_owned(), + TransactionOrigin::Foreign, + ) .await; // send notification to peers @@ -962,7 +965,7 @@ impl MainLoopHandler { // Check if it's time to run the proof-upgrader, and if we're capable // of upgrading a transaction proof. let tx_upgrade_interval = self.global_state_lock.cli().tx_upgrade_interval(); - let upgrade_candidate = { + let (upgrade_candidate, tx_origin) = { let global_state = self.global_state_lock.lock_guard().await; let now = self.now(); if !attempt_upgrade(&global_state, now, tx_upgrade_interval, main_loop_state)? { @@ -973,12 +976,13 @@ impl MainLoopHandler { debug!("Attempting to run transaction-proof-upgrade"); // Find a candidate for proof upgrade - let Some(upgrade_candidate) = get_upgrade_task_from_mempool(&global_state) else { + let Some((upgrade_candidate, tx_origin)) = get_upgrade_task_from_mempool(&global_state) + else { debug!("Found no transaction-proof to upgrade"); return Ok(()); }; - upgrade_candidate + (upgrade_candidate, tx_origin) }; info!( @@ -1002,7 +1006,7 @@ impl MainLoopHandler { upgrade_candidate .handle_upgrade( &vm_job_queue, - TritonVmJobPriority::Low, + tx_origin, perform_ms_update_if_needed, global_state_lock_clone, main_to_peer_broadcast_tx_clone, @@ -1279,7 +1283,7 @@ impl MainLoopHandler { self.global_state_lock .lock_guard_mut() .await - .mempool_insert(*transaction.clone()) + .mempool_insert(*transaction.clone(), TransactionOrigin::Own) .await; // Is this a transaction we can share with peers? If so, share @@ -1317,7 +1321,7 @@ impl MainLoopHandler { upgrade_job .handle_upgrade( &vm_job_queue, - TritonVmJobPriority::High, + TransactionOrigin::Own, true, global_state_lock_clone, main_to_peer_broadcast_tx_clone, @@ -1553,7 +1557,7 @@ mod tests { .global_state_lock .lock_guard_mut() .await - .mempool_insert(proof_collection_tx.clone()) + .mempool_insert(proof_collection_tx.clone(), TransactionOrigin::Foreign) .await; assert!( diff --git a/src/main_loop/proof_upgrader.rs b/src/main_loop/proof_upgrader.rs index 0006590bd..5092a3456 100644 --- a/src/main_loop/proof_upgrader.rs +++ b/src/main_loop/proof_upgrader.rs @@ -8,6 +8,7 @@ use tasm_lib::triton_vm::proof::Proof; use tracing::error; use tracing::info; +use super::TransactionOrigin; use crate::job_queue::triton_vm::TritonVmJobPriority; use crate::job_queue::triton_vm::TritonVmJobQueue; use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate; @@ -150,13 +151,18 @@ impl UpgradeJob { pub(crate) async fn handle_upgrade( self, triton_vm_job_queue: &TritonVmJobQueue, - priority: TritonVmJobPriority, + tx_origin: TransactionOrigin, perform_ms_update_if_needed: bool, mut global_state_lock: GlobalStateLock, main_to_peer_channel: tokio::sync::broadcast::Sender, ) { let mut upgrade_job = self; + let job_priority = match tx_origin { + TransactionOrigin::Foreign => TritonVmJobPriority::Lowest, + TransactionOrigin::Own => TritonVmJobPriority::High, + }; + // process in a loop. in case a new block comes in while processing // the current tx, then we can move on to the next, and so on. loop { @@ -170,7 +176,7 @@ impl UpgradeJob { let affected_txids = upgrade_job.affected_txids(); let mutator_set_for_tx = upgrade_job.mutator_set(); - let upgraded = match upgrade_job.upgrade(triton_vm_job_queue, priority).await { + let upgraded = match upgrade_job.upgrade(triton_vm_job_queue, job_priority).await { Ok(upgraded_tx) => { info!( "Successfully upgraded transaction {}", @@ -208,7 +214,7 @@ impl UpgradeJob { )) .unwrap(); - global_state.mempool_insert(upgraded).await; + global_state.mempool_insert(upgraded, tx_origin).await; info!("Successfully handled proof upgrade."); return; @@ -357,12 +363,15 @@ impl UpgradeJob { } /// Return an [UpgradeJob] that describes work that can be done to upgrade the -/// proof-quality of a transaction found in mempool. -pub(super) fn get_upgrade_task_from_mempool(global_state: &GlobalState) -> Option { +/// proof-quality of a transaction found in mempool. Also indicates whether the +/// upgrade job affects one of our own transaction, or a foreign transaction. +pub(super) fn get_upgrade_task_from_mempool( + global_state: &GlobalState, +) -> Option<(UpgradeJob, TransactionOrigin)> { // Do we have any `ProofCollection`s? let tip = global_state.chain.light_state().body(); - if let Some((kernel, proof)) = global_state.mempool.most_dense_proof_collection() { + if let Some((kernel, proof, tx_origin)) = global_state.mempool.most_dense_proof_collection() { let upgrade_decision = UpgradeJob::ProofCollectionToSingleProof { kernel: kernel.to_owned(), proof: proof.to_owned(), @@ -374,12 +383,14 @@ pub(super) fn get_upgrade_task_from_mempool(global_state: &GlobalState) -> Optio return None; } - return Some(upgrade_decision); + return Some((upgrade_decision, tx_origin)); } // Can we merge two single proofs? - if let Some([(left_kernel, left_single_proof), (right_kernel, right_single_proof)]) = - global_state.mempool.most_dense_single_proof_pair() + if let Some(( + [(left_kernel, left_single_proof), (right_kernel, right_single_proof)], + tx_origin, + )) = global_state.mempool.most_dense_single_proof_pair() { let mut rng: StdRng = SeedableRng::from_seed(global_state.shuffle_seed()); let upgrade_decision = UpgradeJob::Merge { @@ -398,7 +409,7 @@ pub(super) fn get_upgrade_task_from_mempool(global_state: &GlobalState) -> Optio return None; } - return Some(upgrade_decision); + return Some((upgrade_decision, tx_origin)); } None diff --git a/src/mine_loop.rs b/src/mine_loop.rs index 63e497e82..207eebda8 100644 --- a/src/mine_loop.rs +++ b/src/mine_loop.rs @@ -668,6 +668,7 @@ pub(crate) mod mine_loop_tests { use crate::job_queue::triton_vm::TritonVmJobQueue; use crate::models::blockchain::type_scripts::neptune_coins::NeptuneCoins; use crate::models::proof_abstractions::timestamp::Timestamp; + use crate::models::state::mempool::TransactionOrigin; use crate::tests::shared::dummy_expected_utxo; use crate::tests::shared::make_mock_transaction_with_mutator_set_hash; use crate::tests::shared::mock_genesis_global_state; @@ -810,7 +811,7 @@ pub(crate) mod mine_loop_tests { rng.gen(), alice_key.to_address().into(), ); - let (tx_by_preminer, _maybe_change_output) = alice + let (tx_from_alice, _maybe_change_output) = alice .lock_guard() .await .create_transaction_with_prover_capability( @@ -864,7 +865,9 @@ pub(crate) mod mine_loop_tests { { let mut alice_gsm = alice.lock_guard_mut().await; - alice_gsm.mempool_insert(tx_by_preminer.clone()).await; + alice_gsm + .mempool_insert(tx_from_alice.clone(), TransactionOrigin::Own) + .await; assert_eq!(1, alice_gsm.mempool.len()); } diff --git a/src/models/blockchain/block/mod.rs b/src/models/blockchain/block/mod.rs index ff27d0a16..1ed0861e2 100644 --- a/src/models/blockchain/block/mod.rs +++ b/src/models/blockchain/block/mod.rs @@ -902,6 +902,8 @@ impl Block { .collect_vec() } + /// Return the mutator set update induced by the previous block and the + /// new transaction. pub(crate) fn ms_update_from_predecessor_and_new_tx_kernel( predecessor_block: &Block, new_transaction_kernel: &TransactionKernel, diff --git a/src/models/state/mempool.rs b/src/models/state/mempool.rs index f6a502d12..282266bb9 100644 --- a/src/models/state/mempool.rs +++ b/src/models/state/mempool.rs @@ -17,6 +17,7 @@ use std::iter::Rev; use bytesize::ByteSize; use get_size::GetSize; +use itertools::Itertools; /// `FeeDensity` is a measure of 'Fee/Bytes' or 'reward per storage unit' for /// transactions. Different strategies are possible for selecting transactions /// to mine, but a simple one is to pick transactions in descending order of @@ -38,8 +39,13 @@ use num_rational::BigRational as FeeDensity; use num_traits::Zero; use priority_queue::double_priority_queue::iterators::IntoSortedIter; use priority_queue::DoublePriorityQueue; +use serde::Deserialize; +use serde::Serialize; use tasm_lib::triton_vm::proof::Proof; +use tracing::debug; use tracing::error; +use tracing::info; +use tracing::warn; use twenty_first::math::digest::Digest; use super::transaction_kernel_id::TransactionKernelId; @@ -82,6 +88,26 @@ pub enum MempoolEvent { UpdateTxMutatorSet(TransactionKernelId, Transaction), } +/// Used to mark origin of transaction. To determine if transaction was +/// initiated locally or not. +#[derive(Debug, GetSize, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) enum TransactionOrigin { + Foreign, + Own, +} + +impl TransactionOrigin { + fn is_own(self) -> bool { + self == Self::Own + } +} + +#[derive(Debug, GetSize, Clone, Serialize, Deserialize)] +pub(crate) struct MempoolTransaction { + transaction: Transaction, + origin: TransactionOrigin, +} + #[derive(Debug, GetSize)] pub struct Mempool { /// Maximum size this data structure may take up in memory. @@ -93,7 +119,7 @@ pub struct Mempool { /// Contains transactions, with a mapping from transaction ID to transaction. /// Maintain for constant lookup - tx_dictionary: HashMap, + tx_dictionary: HashMap, /// Allows the mempool to report transactions sorted by [`FeeDensity`] in /// both descending and ascending order. @@ -146,7 +172,7 @@ impl Mempool { proof_quality: TransactionProofQuality, ) -> bool { if let Some(tx) = self.tx_dictionary.get(&transaction_id) { - match tx.proof.proof_quality() { + match tx.transaction.proof.proof_quality() { Ok(mempool_proof_quality) => mempool_proof_quality >= proof_quality, Err(_) => { // Any proof quality is better than none. @@ -154,7 +180,10 @@ impl Mempool { // e.g. primitive witness in mempool and now the same transaction // with an associated proof is queried. That probably shouldn't // happen. - error!("Failed to read proof quality for tx in mempool"); + error!( + "Failed to read proof quality for tx in mempool. txid: {}", + transaction_id + ); false } } @@ -167,11 +196,17 @@ impl Mempool { /// fee-density if mempool contains any such transactions. Otherwise None. pub(crate) fn most_dense_proof_collection( &self, - ) -> Option<(&TransactionKernel, &ProofCollection)> { + ) -> Option<(&TransactionKernel, &ProofCollection, TransactionOrigin)> { for (txid, _fee_density) in self.get_sorted_iter() { let candidate = self.tx_dictionary.get(&txid).unwrap(); - if let TransactionProof::ProofCollection(proof_collection) = &candidate.proof { - return Some((&candidate.kernel, proof_collection)); + if let TransactionProof::ProofCollection(proof_collection) = + &candidate.transaction.proof + { + return Some(( + &candidate.transaction.kernel, + proof_collection, + candidate.origin, + )); } } @@ -180,16 +215,25 @@ impl Mempool { /// Return the two most dense single-proof transactions. Returns `None` if /// no such pair exists in the mempool. - pub(crate) fn most_dense_single_proof_pair(&self) -> Option<[(&TransactionKernel, &Proof); 2]> { + pub(crate) fn most_dense_single_proof_pair( + &self, + ) -> Option<([(&TransactionKernel, &Proof); 2], TransactionOrigin)> { let mut ret = vec![]; + let mut own_tx = false; for (txid, _fee_density) in self.get_sorted_iter() { let candidate = self.tx_dictionary.get(&txid).unwrap(); - if let TransactionProof::SingleProof(proof) = &candidate.proof { - ret.push((&candidate.kernel, proof)); + if let TransactionProof::SingleProof(proof) = &candidate.transaction.proof { + ret.push((&candidate.transaction.kernel, proof)); + own_tx = own_tx || candidate.origin.is_own(); } + let origin = match own_tx { + true => TransactionOrigin::Own, + false => TransactionOrigin::Foreign, + }; + if ret.len() == 2 { - return Some(ret.try_into().unwrap()); + return Some((ret.try_into().unwrap(), origin)); } } @@ -207,7 +251,9 @@ impl Mempool { /// /// Computes in O(1) from HashMap pub fn get(&self, transaction_id: TransactionKernelId) -> Option<&Transaction> { - self.tx_dictionary.get(&transaction_id) + self.tx_dictionary + .get(&transaction_id) + .map(|x| &x.transaction) } /// Returns the list of transactions already in the mempool that a @@ -217,7 +263,7 @@ impl Mempool { fn transaction_conflicts_with( &self, transaction: &Transaction, - ) -> Vec<(TransactionKernelId, Transaction)> { + ) -> Vec<(TransactionKernelId, &Transaction)> { // This check could be made a lot more efficient, for example with an invertible Bloom filter let tx_sbf_indices: HashSet<_> = transaction .kernel @@ -228,9 +274,9 @@ impl Mempool { let mut conflict_txs_in_mempool = vec![]; for (txid, tx) in self.tx_dictionary.iter() { - for mempool_tx_input in tx.kernel.inputs.iter() { + for mempool_tx_input in tx.transaction.kernel.inputs.iter() { if tx_sbf_indices.contains(&mempool_tx_input.absolute_indices.to_array()) { - conflict_txs_in_mempool.push((*txid, tx.to_owned())); + conflict_txs_in_mempool.push((*txid, &tx.transaction)); } } } @@ -245,7 +291,7 @@ impl Mempool { /// inserted. /// /// The caller must also ensure that the transaction does not have a timestamp - /// in the too distant future. + /// in the too distant future, as such a transaction cannot be mined. /// /// this method may return: /// n events: RemoveTx,AddTx. tx replaces a list of older txs with lower fee. @@ -255,7 +301,11 @@ impl Mempool { /// # Panics /// /// Panics if the transaction's proof is of the wrong type. - pub(super) fn insert(&mut self, transaction: Transaction) -> Vec { + pub(super) fn insert( + &mut self, + transaction: Transaction, + origin: TransactionOrigin, + ) -> Vec { let mut events = vec![]; match transaction.proof { @@ -274,9 +324,10 @@ impl Mempool { // merged. let conflicts = self.transaction_conflicts_with(&transaction); let min_fee_of_conflicts = conflicts.iter().map(|x| x.1.fee_density()).min(); + let conflicts = conflicts.into_iter().map(|x| x.0).collect_vec(); if let Some(min_fee_of_conflicting_tx) = min_fee_of_conflicts { if min_fee_of_conflicting_tx < transaction.fee_density() { - for (conflicting_txid, _) in conflicts { + for conflicting_txid in conflicts { if let Some(e) = self.remove(conflicting_txid) { events.push(e); } @@ -291,7 +342,12 @@ impl Mempool { let txid = transaction.kernel.txid(); self.queue.push(txid, transaction.fee_density()); - self.tx_dictionary.insert(txid, transaction.to_owned()); + + let as_mempool_transaction = MempoolTransaction { + transaction: transaction.clone(), + origin, + }; + self.tx_dictionary.insert(txid, as_mempool_transaction); events.push(MempoolEvent::AddTx(transaction)); assert_eq!( @@ -315,7 +371,7 @@ impl Mempool { self.tx_dictionary.remove(&transaction_id).map(|tx| { self.queue.remove(&transaction_id); debug_assert_eq!(self.tx_dictionary.len(), self.queue.len()); - MempoolEvent::RemoveTx(tx) + MempoolEvent::RemoveTx(tx.transaction) }) } @@ -339,6 +395,17 @@ impl Mempool { self.tx_dictionary.len() } + /// Return the number of transactions currently stored in the mempool that + /// were initiated locally. + /// + /// Computes in O(n) + pub(crate) fn num_own_txs(&self) -> usize { + self.tx_dictionary + .values() + .filter(|x| x.origin.is_own()) + .count() + } + /// check if `Mempool` is empty /// /// Computes in O(1) @@ -400,10 +467,10 @@ impl Mempool { #[allow(dead_code)] fn pop_max(&mut self) -> Option<(MempoolEvent, FeeDensity)> { if let Some((transaction_digest, fee_density)) = self.queue.pop_max() { - if let Some(transaction) = self.tx_dictionary.remove(&transaction_digest) { + if let Some(tx) = self.tx_dictionary.remove(&transaction_digest) { debug_assert_eq!(self.tx_dictionary.len(), self.queue.len()); - let event = MempoolEvent::RemoveTx(transaction); + let event = MempoolEvent::RemoveTx(tx.transaction); return Some((event, fee_density)); } @@ -417,10 +484,10 @@ impl Mempool { /// Computes in θ(lg N) fn pop_min(&mut self) -> Option<(MempoolEvent, FeeDensity)> { if let Some((transaction_digest, fee_density)) = self.queue.pop_min() { - if let Some(transaction) = self.tx_dictionary.remove(&transaction_digest) { + if let Some(tx) = self.tx_dictionary.remove(&transaction_digest) { debug_assert_eq!(self.tx_dictionary.len(), self.queue.len()); - let event = MempoolEvent::RemoveTx(transaction); + let event = MempoolEvent::RemoveTx(tx.transaction); return Some((event, fee_density)); } @@ -474,14 +541,16 @@ impl Mempool { } /// Remove from the mempool all transactions that become invalid because - /// of a newly received block. Also update all mutator set data for mempool - /// transactions that were not removed. + /// of a newly received block. Update all mutator set data for transactions + /// that are our own. If client acts as a composer, all transactions are + /// updated. pub(super) async fn update_with_block_and_predecessor( &mut self, new_block: &Block, predecessor_block: &Block, vm_job_queue: &TritonVmJobQueue, priority: TritonVmJobPriority, + composing: bool, ) -> Vec { let previous_mutator_set_accumulator = predecessor_block.body().mutator_set_accumulator.clone(); @@ -495,10 +564,10 @@ impl Mempool { self.clear(); } - // The general strategy is to check whether the SWBF index set of a given - // transaction in the mempool is disjoint (*i.e.*, not contained by) the - // SWBF indices coming from the block transaction. If they are not disjoint, - // then remove the transaction from the mempool. + // The general strategy is to check whether the SWBF index set of a + // given transaction in the mempool is disjoint from (*i.e.*, not + // contained by) SWBF indices coming from the block transaction. If they + // are not disjoint, then remove the transaction from the mempool. // Compute the union of all index sets generated by the block transaction. let swbf_index_set_union: HashSet<_> = new_block @@ -511,9 +580,9 @@ impl Mempool { .collect(); // The indices that the block transaction inserts are used to determine - // which mempool transactions contain UTXOs that were spent in this block. Any - // transaction that contains just *one* input-UTXO that was spent in - // this block is invalid + // which mempool transactions contain UTXOs that were spent in this + // block. Any transaction that contains just *one* input-UTXO that was + // spent in this block is now invalid. let keep = |(_transaction_id, tx): LookupItem| -> bool { let transaction_index_sets: HashSet<_> = tx .kernel @@ -539,11 +608,27 @@ impl Mempool { let mutator_set_update = Block::mutator_set_update_from_consecutive_pair(predecessor_block, new_block); - // Update the remaining transactions so their mutator set data is still valid - // But kick out those transactions that we were unable to update. + // Update policy: + // We update transaction if either of these conditions are true: + // a) We're composing + // b) We initiated this transaction. + + // If we cannot update the transaction, we kick it out regardless. let mut kick_outs = Vec::with_capacity(self.tx_dictionary.len()); for (tx_id, tx) in self.tx_dictionary.iter_mut() { + if !(composing || tx.origin.is_own()) { + debug!( + "Not updating transaction {tx_id} since it's not \ + initiated by us, and client is not composing." + ); + kick_outs.push(*tx_id); + events.push(MempoolEvent::RemoveTx(tx.transaction.clone())); + continue; + } + + // Attempt update if let Ok(new_tx) = tx + .transaction .clone() .new_with_updated_mutator_set_records( &previous_mutator_set_accumulator, @@ -553,12 +638,22 @@ impl Mempool { ) .await { - *tx = new_tx; - events.push(MempoolEvent::UpdateTxMutatorSet(*tx_id, (*tx).clone())); + tx.transaction = new_tx; + events.push(MempoolEvent::UpdateTxMutatorSet( + *tx_id, + tx.transaction.clone(), + )); } else { - error!("Failed to update transaction {tx_id}. Removing from mempool."); + info!("Failed to update transaction {tx_id}. Removing from mempool."); + if tx.origin.is_own() { + warn!( + "Removed transaction was marked as own transaction. \ + You probably have to create this transaction again." + ); + } + kick_outs.push(*tx_id); - events.push(MempoolEvent::RemoveTx(tx.clone())); + events.push(MempoolEvent::RemoveTx(tx.transaction.clone())); } } @@ -686,7 +781,7 @@ mod tests { let transaction_digests = txs.iter().map(|tx| tx.kernel.txid()).collect_vec(); assert!(!mempool.contains(transaction_digests[0])); assert!(!mempool.contains(transaction_digests[1])); - mempool.insert(txs[0].clone()); + mempool.insert(txs[0].clone(), TransactionOrigin::Foreign); assert!(mempool.contains(transaction_digests[0])); assert!(!mempool.contains(transaction_digests[1])); @@ -713,12 +808,16 @@ mod tests { } /// Create a mempool with n transactions. - async fn setup_mock_mempool(transactions_count: usize, network: Network) -> Mempool { + async fn setup_mock_mempool( + transactions_count: usize, + network: Network, + origin: TransactionOrigin, + ) -> Mempool { let genesis_block = Block::genesis_block(network); let mut mempool = Mempool::new(ByteSize::gb(1), None, genesis_block.hash()); let txs = make_plenty_mock_transaction_with_primitive_witness(transactions_count); for tx in txs { - mempool.insert(tx); + mempool.insert(tx, origin); } assert_eq!(transactions_count, mempool.len()); @@ -731,7 +830,7 @@ mod tests { async fn get_densest_transactions_no_tx_cap() { // Verify that transactions are returned ordered by fee density, with highest fee density first let num_txs = 10; - let mempool = setup_mock_mempool(num_txs, Network::Main).await; + let mempool = setup_mock_mempool(num_txs, Network::Main, TransactionOrigin::Foreign).await; let max_fee_density: FeeDensity = FeeDensity::new(BigInt::from(u128::MAX), BigInt::from(1)); let mut prev_fee_density = max_fee_density; @@ -750,7 +849,7 @@ mod tests { async fn get_densest_transactions_with_tx_cap() { // Verify that transactions are returned ordered by fee density, with highest fee density first let num_txs = 12; - let mempool = setup_mock_mempool(num_txs, Network::Main).await; + let mempool = setup_mock_mempool(num_txs, Network::Main, TransactionOrigin::Foreign).await; let max_fee_density: FeeDensity = FeeDensity::new(BigInt::from(u128::MAX), BigInt::from(1)); let mut prev_fee_density = max_fee_density; @@ -769,7 +868,7 @@ mod tests { #[tokio::test] async fn most_dense_proof_collection_test() { let network = Network::Main; - let mut mempool = setup_mock_mempool(0, network).await; + let mut mempool = setup_mock_mempool(0, network, TransactionOrigin::Foreign).await; let genesis_block = Block::genesis_block(network); let bob_wallet_secret = WalletSecret::devnet_wallet(); let bob_spending_key = bob_wallet_secret.nth_generation_spending_key_for_tests(0); @@ -798,7 +897,7 @@ mod tests { ); let tx_by_bob_txid = tx_by_bob.kernel.txid(); - mempool.insert(tx_by_bob); + mempool.insert(tx_by_bob, TransactionOrigin::Foreign); assert_eq!( mempool.most_dense_proof_collection().unwrap().0.txid(), tx_by_bob_txid @@ -809,7 +908,7 @@ mod tests { #[tokio::test] async fn get_sorted_iter() { // Verify that the function `get_sorted_iter` returns transactions sorted by fee density - let mempool = setup_mock_mempool(10, Network::Main).await; + let mempool = setup_mock_mempool(10, Network::Main, TransactionOrigin::Foreign).await; let max_fee_density: FeeDensity = FeeDensity::new(BigInt::from(u128::MAX), BigInt::from(1)); let mut prev_fee_density = max_fee_density; @@ -825,7 +924,7 @@ mod tests { #[tokio::test] async fn max_num_transactions_is_respected() { let num_txs = 12; - let mempool = setup_mock_mempool(num_txs, Network::Main).await; + let mempool = setup_mock_mempool(num_txs, Network::Main, TransactionOrigin::Foreign).await; for i in 0..num_txs { assert_eq!( i, @@ -852,13 +951,13 @@ mod tests { let old_txs = make_mock_txs_with_primitive_witness_with_timestamp(6, eight_days_ago); for tx in old_txs { - mempool.insert(tx); + mempool.insert(tx, TransactionOrigin::Foreign); } let new_txs = make_mock_txs_with_primitive_witness_with_timestamp(5, now); for tx in new_txs { - mempool.insert(tx); + mempool.insert(tx, TransactionOrigin::Foreign); } assert_eq!(mempool.len(), 11); @@ -954,7 +1053,7 @@ mod tests { // Add this transaction to a mempool let mut mempool = Mempool::new(ByteSize::gb(1), None, block_1.hash()); - mempool.insert(tx_by_bob.clone()); + mempool.insert(tx_by_bob.clone(), TransactionOrigin::Own); // Create another transaction that's valid to be included in block 2, but isn't actually // included by the miner. This transaction is inserted into the mempool, but since it's @@ -980,7 +1079,7 @@ mod tests { ) .await .unwrap(); - mempool.insert(tx_from_alice_original); + mempool.insert(tx_from_alice_original, TransactionOrigin::Own); { // Verify that `most_dense_single_proof_pair` returns expected value @@ -991,6 +1090,7 @@ mod tests { mempool .most_dense_single_proof_pair() .unwrap() + .0 .map(|x| x.0.txid()) .to_vec() ); @@ -1027,6 +1127,7 @@ mod tests { &block_1, &TritonVmJobQueue::dummy(), TritonVmJobPriority::default(), + true, ) .await; assert_eq!(1, mempool.len()); @@ -1058,6 +1159,7 @@ mod tests { &previous_block, &TritonVmJobQueue::dummy(), TritonVmJobPriority::default(), + true, ) .await; previous_block = next_block; @@ -1092,6 +1194,7 @@ mod tests { &previous_block, &TritonVmJobQueue::dummy(), TritonVmJobPriority::default(), + true, ) .await; @@ -1163,8 +1266,8 @@ mod tests { let mut mempool = Mempool::new(ByteSize::gb(1), None, genesis_block.hash()); let ((left, right), merged) = merge_tx_triplet().await; - mempool.insert(left); - mempool.insert(right); + mempool.insert(left, TransactionOrigin::Foreign); + mempool.insert(right, TransactionOrigin::Foreign); assert_eq!(2, mempool.len()); // Verify that `most_dense_single_proof_pair` returns expected value @@ -1175,11 +1278,12 @@ mod tests { mempool .most_dense_single_proof_pair() .unwrap() + .0 .map(|x| x.0.txid()) .to_vec() ); - mempool.insert(merged.clone()); + mempool.insert(merged.clone(), TransactionOrigin::Foreign); assert_eq!(1, mempool.len()); assert_eq!(&merged, mempool.get(merged.kernel.txid()).unwrap()); @@ -1246,7 +1350,11 @@ mod tests { .await .unwrap(); - alice.lock_guard_mut().await.mempool.insert(unmined_tx); + alice + .lock_guard_mut() + .await + .mempool + .insert(unmined_tx, TransactionOrigin::Own); // Add some blocks. The transaction must stay in the mempool, since it // is not being mined. @@ -1358,7 +1466,7 @@ mod tests { make_transaction_with_fee(NeptuneCoins::new(1), preminer.clone(), rng.gen()).await; { let mempool = &mut preminer.lock_guard_mut().await.mempool; - mempool.insert(tx_low_fee.clone()); + mempool.insert(tx_low_fee.clone(), TransactionOrigin::Foreign); assert_eq!(1, mempool.len()); assert_eq!(&tx_low_fee, mempool.get(tx_low_fee.kernel.txid()).unwrap()); } @@ -1369,7 +1477,7 @@ mod tests { make_transaction_with_fee(NeptuneCoins::new(10), preminer.clone(), rng.gen()).await; { let mempool = &mut preminer.lock_guard_mut().await.mempool; - mempool.insert(tx_high_fee.clone()); + mempool.insert(tx_high_fee.clone(), TransactionOrigin::Foreign); assert_eq!(1, mempool.len()); assert_eq!( &tx_high_fee, @@ -1383,7 +1491,7 @@ mod tests { let tx_medium_fee = make_transaction_with_fee(NeptuneCoins::new(4), preminer.clone(), rng.gen()).await; let mempool = &mut preminer.lock_guard_mut().await.mempool; - mempool.insert(tx_medium_fee.clone()); + mempool.insert(tx_medium_fee.clone(), TransactionOrigin::Foreign); assert_eq!(1, mempool.len()); assert_eq!( &tx_high_fee, @@ -1401,7 +1509,7 @@ mod tests { let txs = make_plenty_mock_transaction_with_primitive_witness(11); let mut mempool = Mempool::new(ByteSize::gb(1), None, genesis_block.hash()); for tx in txs { - mempool.insert(tx); + mempool.insert(tx, TransactionOrigin::Foreign); } assert!(mempool @@ -1421,7 +1529,7 @@ mod tests { let mut mempool = Mempool::new(ByteSize::gb(1), None, genesis_block.hash()); for tx in txs { - mempool.insert(tx); + mempool.insert(tx, TransactionOrigin::Foreign); } assert_eq!( @@ -1445,7 +1553,7 @@ mod tests { for i in 0..10 { let mut mempool = Mempool::new(ByteSize::gb(1), Some(i), genesis_block.hash()); for tx in txs.clone() { - mempool.insert(tx); + mempool.insert(tx, TransactionOrigin::Foreign); } assert_eq!( @@ -1471,7 +1579,8 @@ mod tests { // Verify that the `get_size` method on mempool returns sane results let network = Network::Main; let tx_count_small = 2; - let mempool_small = setup_mock_mempool(tx_count_small, network).await; + let mempool_small = + setup_mock_mempool(tx_count_small, network, TransactionOrigin::Foreign).await; let size_gs_small = mempool_small.get_size(); let size_serialized_small = bincode::serialize(&mempool_small.tx_dictionary) .unwrap() @@ -1487,7 +1596,8 @@ mod tests { ); let tx_count_big = 6; - let mempool_big = setup_mock_mempool(tx_count_big, network).await; + let mempool_big = + setup_mock_mempool(tx_count_big, network, TransactionOrigin::Foreign).await; let size_gs_big = mempool_big.get_size(); let size_serialized_big = bincode::serialize(&mempool_big.tx_dictionary) .unwrap() diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index a511f2d57..edf263843 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -22,6 +22,7 @@ use block_proposal::BlockProposal; use blockchain_state::BlockchainState; use itertools::Itertools; use mempool::Mempool; +use mempool::TransactionOrigin; use mining_status::MiningStatus; use networking_state::NetworkingState; use num_traits::CheckedSub; @@ -1478,6 +1479,7 @@ impl GlobalState { &tip_parent, vm_job_queue, TritonVmJobPriority::Highest, + myself.cli().compose, ) .await; @@ -1542,8 +1544,12 @@ impl GlobalState { } /// adds Tx to mempool and notifies wallet of change. - pub async fn mempool_insert(&mut self, transaction: Transaction) { - let events = self.mempool.insert(transaction); + pub(crate) async fn mempool_insert( + &mut self, + transaction: Transaction, + origin: TransactionOrigin, + ) { + let events = self.mempool.insert(transaction, origin); self.wallet_state.handle_mempool_events(events).await } diff --git a/src/models/state/wallet/wallet_state.rs b/src/models/state/wallet/wallet_state.rs index bad9f78a1..da4bc7767 100644 --- a/src/models/state/wallet/wallet_state.rs +++ b/src/models/state/wallet/wallet_state.rs @@ -1539,6 +1539,7 @@ mod tests { use crate::models::blockchain::transaction::transaction_output::UtxoNotificationMedium; use crate::models::state::tx_proving_capability::TxProvingCapability; use crate::models::state::wallet::address::ReceivingAddress; + use crate::models::state::TransactionOrigin; use crate::tests::shared::mine_block_to_wallet_invalid_block_proof; /// basic test for confirmed and unconfirmed balance. @@ -1618,7 +1619,7 @@ mod tests { global_state_lock .lock_guard_mut() .await - .mempool_insert(tx) + .mempool_insert(tx, TransactionOrigin::Own) .await; { diff --git a/src/peer_loop.rs b/src/peer_loop.rs index 0ac4b7d24..07ffb2ec5 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -1485,6 +1485,7 @@ mod peer_loop_tests { use crate::models::blockchain::transaction::transaction_output::UtxoNotificationMedium; use crate::models::blockchain::type_scripts::neptune_coins::NeptuneCoins; use crate::models::peer::transaction_notification::TransactionNotification; + use crate::models::state::mempool::TransactionOrigin; use crate::models::state::tx_proving_capability::TxProvingCapability; use crate::models::state::wallet::WalletSecret; use crate::tests::shared::get_dummy_peer_connection_data_genesis; @@ -2779,7 +2780,7 @@ mod peer_loop_tests { state_lock .lock_guard_mut() .await - .mempool_insert(transaction_1.clone()) + .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign) .await; assert!( !state_lock.lock_guard().await.mempool.is_empty(), @@ -2883,7 +2884,7 @@ mod peer_loop_tests { alice .lock_guard_mut() .await - .mempool_insert(own_tx.to_owned()) + .mempool_insert(own_tx.to_owned(), TransactionOrigin::Foreign) .await; let tx_notification: TransactionNotification = new_tx.try_into().unwrap(); diff --git a/src/rpc_server.rs b/src/rpc_server.rs index eb1cfc03a..49ab8091d 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -54,7 +54,8 @@ pub struct DashBoardOverviewDataFromClient { pub timelocked_balance: NeptuneCoins, pub available_unconfirmed_balance: NeptuneCoins, pub mempool_size: usize, - pub mempool_tx_count: usize, + pub mempool_total_tx_count: usize, + pub mempool_own_tx_count: usize, // `None` symbolizes failure in getting peer count pub peer_count: Option, @@ -712,7 +713,8 @@ impl RPC for NeptuneRPCServer { let wallet_status = state.get_wallet_status_for_tip().await; let syncing = state.net.syncing; let mempool_size = state.mempool.get_size(); - let mempool_tx_count = state.mempool.len(); + let mempool_total_tx_count = state.mempool.len(); + let mempool_own_tx_count = state.mempool.num_own_txs(); let cpu_temp = Self::cpu_temp_inner(); let unconfirmed_balance = state .wallet_state @@ -734,7 +736,8 @@ impl RPC for NeptuneRPCServer { timelocked_balance: wallet_status.synced_unspent_timelocked_amount(now), available_unconfirmed_balance: unconfirmed_balance, mempool_size, - mempool_tx_count, + mempool_total_tx_count, + mempool_own_tx_count, peer_count, mining_status, confirmations,