Skip to content

Commit

Permalink
mempool: Track which transaction are own
Browse files Browse the repository at this point in the history
This is done to determine which transactions that should be updated when
a new block arrives.

Update is an expensive STARK proof to produce, so we opt for the
following policy:
- own transactions are updated, if possible.
- If client is composing, all transactions are updated, as we otherwise
  cannot include the mempool transactions in the new block.

Also note that, if client is composing, it *has* to wait for the update
to finish before starting a new composition. Otherwise, the transactions
will never be mined.
  • Loading branch information
Sword-Smith committed Nov 8, 2024
1 parent 06be015 commit 8f86609
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 97 deletions.
8 changes: 5 additions & 3 deletions src/bin/dashboard_src/overview_screen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ pub struct OverviewData {
archive_coverage: Option<f64>,

mempool_size: Option<ByteSize>,
mempool_tx_count: Option<u32>,
mempool_total_tx_count: Option<u32>,
mempool_own_tx_count: Option<u32>,

listen_address: Option<SocketAddr>,
peer_count: Option<usize>,
Expand Down Expand Up @@ -144,7 +145,8 @@ impl OverviewScreen {
own_overview_data.tip_digest = Some(resp.tip_digest);
own_overview_data.block_header = Some(resp.tip_header);
own_overview_data.mempool_size = Some(ByteSize::b(resp.mempool_size.try_into().unwrap()));
own_overview_data.mempool_tx_count = Some(resp.mempool_tx_count.try_into().unwrap());
own_overview_data.mempool_total_tx_count = Some(resp.mempool_total_tx_count.try_into().unwrap());
own_overview_data.mempool_own_tx_count = Some(resp.mempool_own_tx_count.try_into().unwrap());
own_overview_data.peer_count=resp.peer_count;
own_overview_data.authenticated_peer_count=Some(0);
own_overview_data.syncing=resp.syncing;
Expand Down Expand Up @@ -398,7 +400,7 @@ impl Widget for OverviewScreen {
lines.push(format!("size: {}", dashifnotset!(data.mempool_size)));
lines.push(format!(
"tx count: {}",
dashifnotset!(data.mempool_tx_count)
dashifnotset!(data.mempool_total_tx_count)
));
Self::report(&lines, "Mempool")
.style(style)
Expand Down
8 changes: 6 additions & 2 deletions src/job_queue/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_util::task::TaskTracker;
Expand Down Expand Up @@ -159,10 +160,12 @@ impl<P: Ord + Send + Sync + 'static> JobQueue<P> {

#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;

use tracing_test::traced_test;

use super::*;

#[tokio::test(flavor = "multi_thread")]
#[traced_test]
async fn run_sync_jobs_by_priority() -> anyhow::Result<()> {
Expand Down Expand Up @@ -223,9 +226,10 @@ mod tests {
}

mod workers {
use super::*;
use std::any::Any;

use super::*;

#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub enum DoubleJobPriority {
Low = 1,
Expand Down
22 changes: 13 additions & 9 deletions src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use tracing::warn;

use crate::connect_to_peers::answer_peer_wrapper;
use crate::connect_to_peers::call_peer_wrapper;
use crate::job_queue::triton_vm::TritonVmJobPriority;
use crate::models::blockchain::block::block_header::BlockHeader;
use crate::models::blockchain::block::block_height::BlockHeight;
use crate::models::blockchain::block::difficulty_control::ProofOfWork;
Expand All @@ -45,6 +44,7 @@ use crate::models::peer::HandshakeData;
use crate::models::peer::PeerInfo;
use crate::models::peer::PeerSynchronizationState;
use crate::models::state::block_proposal::BlockProposal;
use crate::models::state::mempool::TransactionOrigin;
use crate::models::state::tx_proving_capability::TxProvingCapability;
use crate::models::state::GlobalState;
use crate::models::state::GlobalStateLock;
Expand Down Expand Up @@ -620,7 +620,10 @@ impl MainLoopHandler {

// Insert into mempool
global_state_mut
.mempool_insert(pt2m_transaction.transaction.to_owned())
.mempool_insert(
pt2m_transaction.transaction.to_owned(),
TransactionOrigin::Foreign,
)
.await;

// send notification to peers
Expand Down Expand Up @@ -962,7 +965,7 @@ impl MainLoopHandler {
// Check if it's time to run the proof-upgrader, and if we're capable
// of upgrading a transaction proof.
let tx_upgrade_interval = self.global_state_lock.cli().tx_upgrade_interval();
let upgrade_candidate = {
let (upgrade_candidate, tx_origin) = {
let global_state = self.global_state_lock.lock_guard().await;
let now = self.now();
if !attempt_upgrade(&global_state, now, tx_upgrade_interval, main_loop_state)? {
Expand All @@ -973,12 +976,13 @@ impl MainLoopHandler {
debug!("Attempting to run transaction-proof-upgrade");

// Find a candidate for proof upgrade
let Some(upgrade_candidate) = get_upgrade_task_from_mempool(&global_state) else {
let Some((upgrade_candidate, tx_origin)) = get_upgrade_task_from_mempool(&global_state)
else {
debug!("Found no transaction-proof to upgrade");
return Ok(());
};

upgrade_candidate
(upgrade_candidate, tx_origin)
};

info!(
Expand All @@ -1002,7 +1006,7 @@ impl MainLoopHandler {
upgrade_candidate
.handle_upgrade(
&vm_job_queue,
TritonVmJobPriority::Low,
tx_origin,
perform_ms_update_if_needed,
global_state_lock_clone,
main_to_peer_broadcast_tx_clone,
Expand Down Expand Up @@ -1279,7 +1283,7 @@ impl MainLoopHandler {
self.global_state_lock
.lock_guard_mut()
.await
.mempool_insert(*transaction.clone())
.mempool_insert(*transaction.clone(), TransactionOrigin::Own)
.await;

// Is this a transaction we can share with peers? If so, share
Expand Down Expand Up @@ -1317,7 +1321,7 @@ impl MainLoopHandler {
upgrade_job
.handle_upgrade(
&vm_job_queue,
TritonVmJobPriority::High,
TransactionOrigin::Own,
true,
global_state_lock_clone,
main_to_peer_broadcast_tx_clone,
Expand Down Expand Up @@ -1553,7 +1557,7 @@ mod tests {
.global_state_lock
.lock_guard_mut()
.await
.mempool_insert(proof_collection_tx.clone())
.mempool_insert(proof_collection_tx.clone(), TransactionOrigin::Foreign)
.await;

assert!(
Expand Down
31 changes: 21 additions & 10 deletions src/main_loop/proof_upgrader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tasm_lib::triton_vm::proof::Proof;
use tracing::error;
use tracing::info;

use super::TransactionOrigin;
use crate::job_queue::triton_vm::TritonVmJobPriority;
use crate::job_queue::triton_vm::TritonVmJobQueue;
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
Expand Down Expand Up @@ -150,13 +151,18 @@ impl UpgradeJob {
pub(crate) async fn handle_upgrade(
self,
triton_vm_job_queue: &TritonVmJobQueue,
priority: TritonVmJobPriority,
tx_origin: TransactionOrigin,
perform_ms_update_if_needed: bool,
mut global_state_lock: GlobalStateLock,
main_to_peer_channel: tokio::sync::broadcast::Sender<MainToPeerTask>,
) {
let mut upgrade_job = self;

let job_priority = match tx_origin {
TransactionOrigin::Foreign => TritonVmJobPriority::Lowest,
TransactionOrigin::Own => TritonVmJobPriority::High,
};

// process in a loop. in case a new block comes in while processing
// the current tx, then we can move on to the next, and so on.
loop {
Expand All @@ -170,7 +176,7 @@ impl UpgradeJob {
let affected_txids = upgrade_job.affected_txids();
let mutator_set_for_tx = upgrade_job.mutator_set();

let upgraded = match upgrade_job.upgrade(triton_vm_job_queue, priority).await {
let upgraded = match upgrade_job.upgrade(triton_vm_job_queue, job_priority).await {
Ok(upgraded_tx) => {
info!(
"Successfully upgraded transaction {}",
Expand Down Expand Up @@ -208,7 +214,7 @@ impl UpgradeJob {
))
.unwrap();

global_state.mempool_insert(upgraded).await;
global_state.mempool_insert(upgraded, tx_origin).await;

info!("Successfully handled proof upgrade.");
return;
Expand Down Expand Up @@ -357,12 +363,15 @@ impl UpgradeJob {
}

/// Return an [UpgradeJob] that describes work that can be done to upgrade the
/// proof-quality of a transaction found in mempool.
pub(super) fn get_upgrade_task_from_mempool(global_state: &GlobalState) -> Option<UpgradeJob> {
/// proof-quality of a transaction found in mempool. Also indicates whether the
/// upgrade job affects one of our own transaction, or a foreign transaction.
pub(super) fn get_upgrade_task_from_mempool(
global_state: &GlobalState,
) -> Option<(UpgradeJob, TransactionOrigin)> {
// Do we have any `ProofCollection`s?
let tip = global_state.chain.light_state().body();

if let Some((kernel, proof)) = global_state.mempool.most_dense_proof_collection() {
if let Some((kernel, proof, tx_origin)) = global_state.mempool.most_dense_proof_collection() {
let upgrade_decision = UpgradeJob::ProofCollectionToSingleProof {
kernel: kernel.to_owned(),
proof: proof.to_owned(),
Expand All @@ -374,12 +383,14 @@ pub(super) fn get_upgrade_task_from_mempool(global_state: &GlobalState) -> Optio
return None;
}

return Some(upgrade_decision);
return Some((upgrade_decision, tx_origin));
}

// Can we merge two single proofs?
if let Some([(left_kernel, left_single_proof), (right_kernel, right_single_proof)]) =
global_state.mempool.most_dense_single_proof_pair()
if let Some((
[(left_kernel, left_single_proof), (right_kernel, right_single_proof)],
tx_origin,
)) = global_state.mempool.most_dense_single_proof_pair()
{
let mut rng: StdRng = SeedableRng::from_seed(global_state.shuffle_seed());
let upgrade_decision = UpgradeJob::Merge {
Expand All @@ -398,7 +409,7 @@ pub(super) fn get_upgrade_task_from_mempool(global_state: &GlobalState) -> Optio
return None;
}

return Some(upgrade_decision);
return Some((upgrade_decision, tx_origin));
}

None
Expand Down
7 changes: 5 additions & 2 deletions src/mine_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ pub(crate) mod mine_loop_tests {
use crate::job_queue::triton_vm::TritonVmJobQueue;
use crate::models::blockchain::type_scripts::neptune_coins::NeptuneCoins;
use crate::models::proof_abstractions::timestamp::Timestamp;
use crate::models::state::mempool::TransactionOrigin;
use crate::tests::shared::dummy_expected_utxo;
use crate::tests::shared::make_mock_transaction_with_mutator_set_hash;
use crate::tests::shared::mock_genesis_global_state;
Expand Down Expand Up @@ -810,7 +811,7 @@ pub(crate) mod mine_loop_tests {
rng.gen(),
alice_key.to_address().into(),
);
let (tx_by_preminer, _maybe_change_output) = alice
let (tx_from_alice, _maybe_change_output) = alice
.lock_guard()
.await
.create_transaction_with_prover_capability(
Expand Down Expand Up @@ -864,7 +865,9 @@ pub(crate) mod mine_loop_tests {

{
let mut alice_gsm = alice.lock_guard_mut().await;
alice_gsm.mempool_insert(tx_by_preminer.clone()).await;
alice_gsm
.mempool_insert(tx_from_alice.clone(), TransactionOrigin::Own)
.await;
assert_eq!(1, alice_gsm.mempool.len());
}

Expand Down
2 changes: 2 additions & 0 deletions src/models/blockchain/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,8 @@ impl Block {
.collect_vec()
}

/// Return the mutator set update induced by the previous block and the
/// new transaction.
pub(crate) fn ms_update_from_predecessor_and_new_tx_kernel(
predecessor_block: &Block,
new_transaction_kernel: &TransactionKernel,
Expand Down
Loading

0 comments on commit 8f86609

Please sign in to comment.