Skip to content

Commit

Permalink
Bring back tx monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Oct 13, 2023
1 parent 96ffb71 commit 52b306a
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 24 deletions.
4 changes: 2 additions & 2 deletions src/contracts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl IdentityManager {
}

#[instrument(level = "debug", skip(self))]
pub async fn mine_identities(&self, transaction_id: TransactionId) -> anyhow::Result<bool> {
pub async fn mine_transaction(&self, transaction_id: TransactionId) -> anyhow::Result<bool> {
let result = self.ethereum.mine_transaction(transaction_id).await?;

Ok(result)
Expand All @@ -345,7 +345,7 @@ impl IdentityManager {
for pending_identity_tx in pending_identities {
// Ignores the result of each transaction - we only care about a clean slate in
// terms of pending transactions
drop(self.mine_identities(pending_identity_tx).await);
drop(self.mine_transaction(pending_identity_tx).await);
}

Ok(())
Expand Down
39 changes: 27 additions & 12 deletions src/task_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use anyhow::Result as AnyhowResult;
use clap::Parser;
use once_cell::sync::Lazy;
use prometheus::{linear_buckets, register_gauge, register_histogram, Gauge, Histogram};
use tokio::sync::{broadcast, Notify, RwLock};
use tokio::sync::{broadcast, mpsc, Notify, RwLock};
use tokio::task::JoinHandle;
use tracing::{info, instrument, warn};

use self::tasks::delete_identities::DeleteIdentities;
use self::tasks::finalize_identities::FinalizeRoots;
use self::tasks::insert_identities::InsertIdentities;
use self::tasks::monitor_txs::MonitorTxs;
use self::tasks::process_identities::ProcessIdentities;
use crate::contracts::SharedIdentityManager;
use crate::database::Database;
Expand Down Expand Up @@ -98,19 +99,17 @@ pub struct Options {
#[clap(long, env, default_value = "0")]
pub max_epoch_duration_seconds: u64,

/// How many identities can be held in the API insertion queue at any given
/// time Past this limit the API request will block until the queue has
/// space for the insertion.
#[clap(long, env, default_value = "100")]
pub insert_identities_capacity: usize,

/// The maximum number of windows to scan for finalization logs
#[clap(long, env, default_value = "100")]
pub scanning_window_size: u64,

/// The number of seconds to wait between fetching logs
#[clap(long, env, default_value = "30")]
pub time_between_scans_seconds: u64,

/// The number of txs in the channel that we'll be monitoring
#[clap(long, env, default_value = "100")]
pub monitored_txs_capacity: usize,
}

/// A worker that commits identities to the blockchain.
Expand Down Expand Up @@ -138,6 +137,7 @@ pub struct TaskMonitor {
batch_deletion_timeout_seconds: i64,
// TODO: docs
min_batch_deletion_size: usize,
monitored_txs_capacity: usize,
}

impl TaskMonitor {
Expand All @@ -151,10 +151,10 @@ impl TaskMonitor {
batch_timeout_seconds,
scanning_window_size,
time_between_scans_seconds,
batch_deletion_timeout_seconds: _,
min_batch_deletion_size: _,
insert_identities_capacity: _,
max_epoch_duration_seconds,
monitored_txs_capacity,
batch_deletion_timeout_seconds,
min_batch_deletion_size,
} = *options;

Self {
Expand All @@ -165,9 +165,10 @@ impl TaskMonitor {
batch_insert_timeout_secs: batch_timeout_seconds,
scanning_window_size,
time_between_scans: Duration::from_secs(time_between_scans_seconds),
batch_deletion_timeout_seconds: options.batch_deletion_timeout_seconds,
min_batch_deletion_size: options.min_batch_deletion_size,
batch_deletion_timeout_seconds,
min_batch_deletion_size,
max_epoch_duration: Duration::from_secs(max_epoch_duration_seconds),
monitored_txs_capacity,
}
}

Expand All @@ -182,6 +183,9 @@ impl TaskMonitor {
// but for symmetry's sake we create it for every task with `.subscribe()`
let (shutdown_sender, _) = broadcast::channel(1);

let (monitored_txs_sender, monitored_txs_receiver) =
mpsc::channel(self.monitored_txs_capacity);

let wake_up_notify = Arc::new(Notify::new());
// Immediately notify so we can start processing if we have pending identities
// in the database
Expand Down Expand Up @@ -214,6 +218,7 @@ impl TaskMonitor {
self.identity_manager.clone(),
self.tree_state.get_batching_tree(),
self.batch_insert_timeout_secs,
monitored_txs_sender,
wake_up_notify.clone(),
);

Expand All @@ -225,6 +230,16 @@ impl TaskMonitor {

handles.push(process_identities_handle);

let monitor_txs = MonitorTxs::new(self.identity_manager.clone(), monitored_txs_receiver);

let monitor_txs_handle = crate::utils::spawn_monitored_with_backoff(
move || monitor_txs.clone().run(),
shutdown_sender.clone(),
PROCESS_IDENTITIES_BACKOFF,
);

handles.push(monitor_txs_handle);

// Insert identities task
let insert_identities = InsertIdentities::new(
self.database.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/task_monitor/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod delete_identities;
pub mod finalize_identities;
pub mod insert_identities;
pub mod monitor_txs;
pub mod process_identities;
45 changes: 45 additions & 0 deletions src/task_monitor/tasks/monitor_txs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::sync::Arc;

use anyhow::Result as AnyhowResult;
use tokio::sync::{mpsc, Mutex};

use crate::contracts::{IdentityManager, SharedIdentityManager};
use crate::ethereum::write::TransactionId;

pub struct MonitorTxs {
identity_manager: SharedIdentityManager,
monitored_txs_receiver: Arc<Mutex<mpsc::Receiver<TransactionId>>>,
}

impl MonitorTxs {
pub fn new(
identity_manager: SharedIdentityManager,
monitored_txs_receiver: mpsc::Receiver<TransactionId>,
) -> Arc<Self> {
Arc::new(Self {
identity_manager,
monitored_txs_receiver: Arc::new(Mutex::new(monitored_txs_receiver)),
})
}

pub async fn run(self: Arc<Self>) -> anyhow::Result<()> {
monitor_txs_loop(&self.identity_manager, &self.monitored_txs_receiver).await?;

Ok(())
}
}

async fn monitor_txs_loop(
identity_manager: &IdentityManager,
monitored_txs_receiver: &Mutex<mpsc::Receiver<TransactionId>>,
) -> AnyhowResult<()> {
let mut monitored_txs_receiver = monitored_txs_receiver.lock().await;

while let Some(tx) = monitored_txs_receiver.recv().await {
if !identity_manager.mine_transaction(tx.clone()).await? {
panic!("Failed to mine transaction: {}", tx);

Check warning on line 40 in src/task_monitor/tasks/monitor_txs.rs

View workflow job for this annotation

GitHub Actions / clippy

variables can be used directly in the `format!` string

warning: variables can be used directly in the `format!` string --> src/task_monitor/tasks/monitor_txs.rs:40:13 | 40 | panic!("Failed to mine transaction: {}", tx); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#uninlined_format_args help: change this to | 40 - panic!("Failed to mine transaction: {}", tx); 40 + panic!("Failed to mine transaction: {tx}"); |

Check warning on line 40 in src/task_monitor/tasks/monitor_txs.rs

View workflow job for this annotation

GitHub Actions / clippy

variables can be used directly in the `format!` string

warning: variables can be used directly in the `format!` string --> src/task_monitor/tasks/monitor_txs.rs:40:13 | 40 | panic!("Failed to mine transaction: {}", tx); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#uninlined_format_args help: change this to | 40 - panic!("Failed to mine transaction: {}", tx); 40 + panic!("Failed to mine transaction: {tx}"); |

Check warning on line 40 in src/task_monitor/tasks/monitor_txs.rs

View workflow job for this annotation

GitHub Actions / clippy

variables can be used directly in the `format!` string

warning: variables can be used directly in the `format!` string --> src/task_monitor/tasks/monitor_txs.rs:40:13 | 40 | panic!("Failed to mine transaction: {}", tx); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#uninlined_format_args help: change this to | 40 - panic!("Failed to mine transaction: {}", tx); 40 + panic!("Failed to mine transaction: {tx}"); |
}

Check warning on line 41 in src/task_monitor/tasks/monitor_txs.rs

View workflow job for this annotation

GitHub Actions / clippy

only a `panic!` in `if`-then statement

warning: only a `panic!` in `if`-then statement --> src/task_monitor/tasks/monitor_txs.rs:39:9 | 39 | / if !identity_manager.mine_transaction(tx.clone()).await? { 40 | | panic!("Failed to mine transaction: {}", tx); 41 | | } | |_________^ help: try instead: `assert!((identity_manager.mine_transaction(tx.clone()).await?), "Failed to mine transaction: {}", tx);` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#manual_assert = note: `#[warn(clippy::manual_assert)]` implied by `#[warn(clippy::pedantic)]`

Check warning on line 41 in src/task_monitor/tasks/monitor_txs.rs

View workflow job for this annotation

GitHub Actions / clippy

only a `panic!` in `if`-then statement

warning: only a `panic!` in `if`-then statement --> src/task_monitor/tasks/monitor_txs.rs:39:9 | 39 | / if !identity_manager.mine_transaction(tx.clone()).await? { 40 | | panic!("Failed to mine transaction: {}", tx); 41 | | } | |_________^ help: try instead: `assert!((identity_manager.mine_transaction(tx.clone()).await?), "Failed to mine transaction: {}", tx);` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#manual_assert = note: `#[warn(clippy::manual_assert)]` implied by `#[warn(clippy::pedantic)]`

Check warning on line 41 in src/task_monitor/tasks/monitor_txs.rs

View workflow job for this annotation

GitHub Actions / clippy

only a `panic!` in `if`-then statement

warning: only a `panic!` in `if`-then statement --> src/task_monitor/tasks/monitor_txs.rs:39:9 | 39 | / if !identity_manager.mine_transaction(tx.clone()).await? { 40 | | panic!("Failed to mine transaction: {}", tx); 41 | | } | |_________^ help: try instead: `assert!((identity_manager.mine_transaction(tx.clone()).await?), "Failed to mine transaction: {}", tx);` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#manual_assert = note: `#[warn(clippy::manual_assert)]` implied by `#[warn(clippy::pedantic)]`
}

Ok(())
}
33 changes: 23 additions & 10 deletions src/task_monitor/tasks/process_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use ethers::types::U256;
use ruint::Uint;
use semaphore::merkle_tree::Proof;
use semaphore::poseidon_tree::Branch;
use tokio::sync::Notify;
use tokio::sync::{mpsc, Notify};
use tokio::{select, time};
use tracing::{debug, error, info, instrument, warn};

use crate::contracts::{IdentityManager, SharedIdentityManager};
use crate::database::Database;
use crate::ethereum::write::TransactionId;
use crate::identity_tree::{
AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion,
};
Expand All @@ -30,6 +31,7 @@ pub struct ProcessIdentities {
identity_manager: SharedIdentityManager,
batching_tree: TreeVersion<Intermediate>,
batch_insert_timeout_secs: u64,
monitored_txs_sender: mpsc::Sender<TransactionId>,
wake_up_notify: Arc<Notify>,
}

Expand All @@ -39,13 +41,15 @@ impl ProcessIdentities {
identity_manager: SharedIdentityManager,
batching_tree: TreeVersion<Intermediate>,
batch_insert_timeout_secs: u64,
monitored_txs_sender: mpsc::Sender<TransactionId>,
wake_up_notify: Arc<Notify>,
) -> Arc<Self> {
Arc::new(Self {
database,
identity_manager,
batching_tree,
batch_insert_timeout_secs,
monitored_txs_sender,
wake_up_notify,
})
}
Expand All @@ -55,6 +59,7 @@ impl ProcessIdentities {
&self.database,
&self.identity_manager,
&self.batching_tree,
&self.monitored_txs_sender,
&self.wake_up_notify,
self.batch_insert_timeout_secs,
)
Expand All @@ -66,6 +71,7 @@ async fn process_identities(
database: &Database,
identity_manager: &IdentityManager,
batching_tree: &TreeVersion<Intermediate>,
monitored_txs_sender: &mpsc::Sender<TransactionId>,
wake_up_notify: &Notify,
timeout_secs: u64,
) -> AnyhowResult<()> {
Expand Down Expand Up @@ -121,6 +127,7 @@ async fn process_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;

Expand Down Expand Up @@ -180,6 +187,7 @@ async fn process_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;

Expand All @@ -201,10 +209,11 @@ async fn commit_identities(
database: &Database,
identity_manager: &IdentityManager,
batching_tree: &TreeVersion<Intermediate>,
monitored_txs_sender: &mpsc::Sender<TransactionId>,
updates: &[AppliedTreeUpdate],
) -> AnyhowResult<()> {
// If the update is an insertion
if updates
let tx_id = if updates
.first()
.context("Updates should be > 1")?
.update
Expand All @@ -221,7 +230,7 @@ async fn commit_identities(
prover.batch_size()
);

insert_identities(database, identity_manager, batching_tree, updates, prover).await?;
insert_identities(database, identity_manager, batching_tree, updates, prover).await?
} else {
let prover = identity_manager
.get_suitable_deletion_prover(updates.len())
Expand All @@ -233,7 +242,11 @@ async fn commit_identities(
prover.batch_size()
);

delete_identities(database, identity_manager, batching_tree, updates, prover).await?;
delete_identities(database, identity_manager, batching_tree, updates, prover).await?
};

Check warning on line 246 in src/task_monitor/tasks/process_identities.rs

View workflow job for this annotation

GitHub Actions / clippy

unnecessary `!=` operation

warning: unnecessary `!=` operation --> src/task_monitor/tasks/process_identities.rs:216:17 | 216 | let tx_id = if updates | _________________^ 217 | | .first() 218 | | .context("Updates should be > 1")? 219 | | .update ... | 245 | | delete_identities(database, identity_manager, batching_tree, updates, prover).await? 246 | | }; | |_____^ | = help: change to `==` and swap the blocks of the `if`/`else` = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#if_not_else = note: `#[warn(clippy::if_not_else)]` implied by `#[warn(clippy::pedantic)]`

Check warning on line 246 in src/task_monitor/tasks/process_identities.rs

View workflow job for this annotation

GitHub Actions / clippy

unnecessary `!=` operation

warning: unnecessary `!=` operation --> src/task_monitor/tasks/process_identities.rs:216:17 | 216 | let tx_id = if updates | _________________^ 217 | | .first() 218 | | .context("Updates should be > 1")? 219 | | .update ... | 245 | | delete_identities(database, identity_manager, batching_tree, updates, prover).await? 246 | | }; | |_____^ | = help: change to `==` and swap the blocks of the `if`/`else` = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#if_not_else = note: `#[warn(clippy::if_not_else)]` implied by `#[warn(clippy::pedantic)]`

Check warning on line 246 in src/task_monitor/tasks/process_identities.rs

View workflow job for this annotation

GitHub Actions / clippy

unnecessary `!=` operation

warning: unnecessary `!=` operation --> src/task_monitor/tasks/process_identities.rs:216:17 | 216 | let tx_id = if updates | _________________^ 217 | | .first() 218 | | .context("Updates should be > 1")? 219 | | .update ... | 245 | | delete_identities(database, identity_manager, batching_tree, updates, prover).await? 246 | | }; | |_____^ | = help: change to `==` and swap the blocks of the `if`/`else` = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#if_not_else = note: `#[warn(clippy::if_not_else)]` implied by `#[warn(clippy::pedantic)]`

if let Some(tx_id) = tx_id {
monitored_txs_sender.send(tx_id).await?;
}

Ok(())
Expand All @@ -246,12 +259,12 @@ pub async fn insert_identities(
batching_tree: &TreeVersion<Intermediate>,
updates: &[AppliedTreeUpdate],
prover: ReadOnlyProver<'_, Prover>,
) -> AnyhowResult<()> {
) -> AnyhowResult<Option<TransactionId>> {
TaskMonitor::log_identities_queues(database).await?;

if updates.is_empty() {
warn!("Identity commit requested with zero identities. Continuing.");
return Ok(());
return Ok(None);
}

debug!("Starting identity commit for {} identities.", updates.len());
Expand Down Expand Up @@ -422,7 +435,7 @@ pub async fn insert_identities(

TaskMonitor::log_batch_size(updates.len());

Ok(())
Ok(Some(transaction_id))
}

pub async fn delete_identities(
Expand All @@ -431,12 +444,12 @@ pub async fn delete_identities(
batching_tree: &TreeVersion<Intermediate>,
updates: &[AppliedTreeUpdate],
prover: ReadOnlyProver<'_, Prover>,
) -> AnyhowResult<()> {
) -> AnyhowResult<Option<TransactionId>> {
TaskMonitor::log_identities_queues(database).await?;

if updates.is_empty() {
warn!("Identity commit requested with zero identities. Continuing.");
return Ok(());
return Ok(None);
}

debug!("Starting identity commit for {} identities.", updates.len());
Expand Down Expand Up @@ -569,5 +582,5 @@ pub async fn delete_identities(

TaskMonitor::log_batch_size(updates.len());

Ok(())
Ok(Some(transaction_id))
}

0 comments on commit 52b306a

Please sign in to comment.