diff --git a/src/contracts/abi.rs b/src/contracts/abi.rs index 0353373b..32f7e7aa 100644 --- a/src/contracts/abi.rs +++ b/src/contracts/abi.rs @@ -2,13 +2,41 @@ use ethers::prelude::abigen; +/// The `TreeChanged` event emitted by the `IdentityManager` contract. +/// Maps to the following enum in the contract code: +/// +/// ```sol +/// enum TreeChange { +/// Insertion, +/// Deletion, +/// Update +/// } +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TreeChangeKind { + Insertion, + Deletion, + Update, +} + +impl From for TreeChangeKind { + fn from(value: u8) -> Self { + match value { + 0 => Self::Insertion, + 1 => Self::Deletion, + 2 => Self::Update, + _ => panic!("Invalid value for TreeChangeKind: {}", value), + } + } +} + abigen!( WorldId, r#"[ struct RootInfo { uint256 root; uint128 supersededTimestamp; bool isValid } event TreeChanged(uint256 indexed preRoot, uint8 indexed kind, uint256 indexed postRoot) function registerIdentities(uint256[8] calldata insertionProof, uint256 preRoot, uint32 startIndex, uint256[] calldata identityCommitments, uint256 postRoot) public virtual - function deleteIdentities(uint256[8] calldata deletionProof, uint32 batchSize, bytes calldata packedDeletionIndices, uint256 preRoot, uint256 postRoot) public virtual + function deleteIdentities(uint256[8] calldata deletionProof, uint32 batchSize, bytes calldata packedDeletionIndices, uint256 preRoot, uint256 postRoot) public virtual function latestRoot() public view virtual returns (uint256 root) function owner() public view virtual returns (address) function queryRoot(uint256 root) public view virtual returns (RootInfo memory) diff --git a/src/contracts/mod.rs b/src/contracts/mod.rs index 03cc0430..aa0719ab 100644 --- a/src/contracts/mod.rs +++ b/src/contracts/mod.rs @@ -5,15 +5,15 @@ pub mod scanner; use std::collections::HashMap; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use clap::Parser; use ethers::providers::Middleware; -use ethers::types::{Address, U256}; +use ethers::types::{Address, H256, U256}; use semaphore::Field; use tokio::sync::RwLockReadGuard; use tracing::{error, info, instrument, warn}; -use self::abi::{BridgedWorldId, WorldId}; +use self::abi::{BridgedWorldId, DeleteIdentitiesCall, WorldId}; use crate::ethereum::write::TransactionId; use crate::ethereum::{Ethereum, ReadProvider}; use crate::prover::identity::Identity; @@ -21,6 +21,7 @@ use crate::prover::map::{DeletionProverMap, InsertionProverMap, ReadOnlyInsertio use crate::prover::{Proof, Prover, ProverConfiguration, ProverType, ReadOnlyProver}; use crate::serde_utils::JsonStrWrapper; use crate::server::error::Error as ServerError; +use crate::utils::index_packing::unpack_indices; /// Configuration options for the component responsible for interacting with the /// contract. @@ -360,6 +361,48 @@ impl IdentityManager { Ok(latest_root) } + /// Fetches the identity commitments from a + /// `deleteIdentities` transaction by tx hash + #[instrument(level = "debug", skip_all)] + pub async fn fetch_deletion_indices_from_tx( + &self, + tx_hash: H256, + ) -> anyhow::Result> { + let provider = self.ethereum.provider(); + + let tx = provider + .get_transaction(tx_hash) + .await? + .context("Missing tx")?; + + use ethers::abi::AbiDecode; + let delete_identities = DeleteIdentitiesCall::decode(&tx.input)?; + + let packed_deletion_indices: &[u8] = delete_identities.packed_deletion_indices.as_ref(); + let indices = unpack_indices(packed_deletion_indices); + + tracing::error!("unpacked = {indices:?}"); + + let padding_index = 2u32.pow(self.tree_depth as u32); + + Ok(indices + .into_iter() + .filter(|idx| *idx != padding_index) + .map(|x| x as usize) + .collect()) + } + + #[instrument(level = "debug", skip_all)] + pub async fn is_root_mined(&self, root: U256) -> anyhow::Result { + let (root_on_mainnet, ..) = self.abi.query_root(root).call().await?; + + if root_on_mainnet.is_zero() { + return Ok(false); + } + + Ok(true) + } + #[instrument(level = "debug", skip_all)] pub async fn is_root_mined_multi_chain(&self, root: U256) -> anyhow::Result { let (root_on_mainnet, ..) = self.abi.query_root(root).call().await?; diff --git a/src/database/mod.rs b/src/database/mod.rs index 3d3f957b..a94d92bb 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -793,7 +793,6 @@ mod test { use postgres_docker_utils::DockerContainerGuard; use ruint::Uint; use semaphore::Field; - use sqlx::Row; use super::{Database, Options}; use crate::identity_tree::{Hash, Status}; diff --git a/src/identity_tree.rs b/src/identity_tree.rs index fc99df00..3533c3a0 100644 --- a/src/identity_tree.rs +++ b/src/identity_tree.rs @@ -516,6 +516,23 @@ impl TreeVersion { } } +impl TreeVersion +where + T: Version, +{ + pub fn commitments_by_indices(&self, indices: &[usize]) -> Vec { + let tree = self.get_data(); + + let mut commitments = vec![]; + + for idx in indices { + commitments.push(tree.tree.get_leaf(*idx)); + } + + commitments + } +} + /// Public API for working with versions that have a successor. Such versions /// only allow peeking and applying updates from the successor. pub trait TreeWithNextVersion { diff --git a/src/task_monitor.rs b/src/task_monitor.rs index e0a33c9b..25f130d8 100644 --- a/src/task_monitor.rs +++ b/src/task_monitor.rs @@ -3,7 +3,6 @@ use std::time::Duration; use anyhow::Result as AnyhowResult; use clap::Parser; -use ethers::types::U256; use once_cell::sync::Lazy; use prometheus::{linear_buckets, register_gauge, register_histogram, Gauge, Histogram}; use tokio::sync::{broadcast, Notify, RwLock}; @@ -13,19 +12,15 @@ use tracing::{info, instrument, warn}; use self::tasks::delete_identities::DeleteIdentities; use self::tasks::finalize_identities::FinalizeRoots; use self::tasks::insert_identities::InsertIdentities; -use self::tasks::mine_identities::MineIdentities; use self::tasks::process_identities::ProcessIdentities; use crate::contracts::SharedIdentityManager; use crate::database::Database; -use crate::ethereum::write::TransactionId; use crate::identity_tree::TreeState; -use crate::utils::async_queue::AsyncQueue; pub mod tasks; const PROCESS_IDENTITIES_BACKOFF: Duration = Duration::from_secs(5); const FINALIZE_IDENTITIES_BACKOFF: Duration = Duration::from_secs(5); -const MINE_IDENTITIES_BACKOFF: Duration = Duration::from_secs(5); const INSERT_IDENTITIES_BACKOFF: Duration = Duration::from_secs(5); const DELETE_IDENTITIES_BACKOFF: Duration = Duration::from_secs(5); @@ -34,60 +29,6 @@ struct RunningInstance { shutdown_sender: broadcast::Sender<()>, } -#[derive(Debug, Clone)] -pub struct PendingBatchInsertion { - transaction_id: TransactionId, - pre_root: U256, - post_root: U256, - start_index: usize, -} - -impl PendingBatchInsertion { - pub fn new( - transaction_id: TransactionId, - pre_root: U256, - post_root: U256, - start_index: usize, - ) -> Self { - Self { - transaction_id, - pre_root, - post_root, - start_index, - } - } -} - -#[derive(Debug, Clone)] -pub struct PendingBatchDeletion { - transaction_id: TransactionId, - pre_root: U256, - commitments: Vec, - post_root: U256, -} - -impl PendingBatchDeletion { - pub fn new( - transaction_id: TransactionId, - pre_root: U256, - commitments: Vec, - post_root: U256, - ) -> Self { - Self { - transaction_id, - pre_root, - commitments, - post_root, - } - } -} - -#[derive(Debug, Clone)] -pub enum PendingBatchSubmission { - Insertion(PendingBatchInsertion), - Deletion(PendingBatchDeletion), -} - static PENDING_IDENTITIES: Lazy = Lazy::new(|| { register_gauge!("pending_identities", "Identities not submitted on-chain").unwrap() }); @@ -152,11 +93,6 @@ pub struct Options { #[clap(long, env, default_value = "100")] pub insert_identities_capacity: usize, - /// How many transactions can be sent "at once" to the blockchain via the - /// write provider. - #[clap(long, env, default_value = "1")] - pub pending_identities_capacity: usize, - /// The maximum number of windows to scan for finalization logs #[clap(long, env, default_value = "100")] pub scanning_window_size: u64, @@ -177,12 +113,11 @@ pub struct TaskMonitor { /// when shutdown is called we want to be able to gracefully /// await the join handles - which requires ownership of the handle and by /// extension the instance. - instance: RwLock>, - database: Arc, - identity_manager: SharedIdentityManager, - tree_state: TreeState, - batch_insert_timeout_secs: u64, - pending_identities_capacity: usize, + instance: RwLock>, + database: Arc, + identity_manager: SharedIdentityManager, + tree_state: TreeState, + batch_insert_timeout_secs: u64, // Finalization params scanning_window_size: u64, @@ -202,7 +137,6 @@ impl TaskMonitor { ) -> Self { let Options { batch_timeout_seconds, - pending_identities_capacity, scanning_window_size, time_between_scans_seconds, batch_deletion_timeout_seconds: _, @@ -216,7 +150,6 @@ impl TaskMonitor { identity_manager: contracts, tree_state, batch_insert_timeout_secs: batch_timeout_seconds, - pending_identities_capacity, scanning_window_size, time_between_scans: Duration::from_secs(time_between_scans_seconds), batch_deletion_timeout_seconds: options.batch_deletion_timeout_seconds, @@ -240,8 +173,6 @@ impl TaskMonitor { // in the database wake_up_notify.notify_one(); - let pending_batch_submissions_queue = AsyncQueue::new(self.pending_identities_capacity); - let mut handles = Vec::new(); // Finalize identities task @@ -262,29 +193,12 @@ impl TaskMonitor { handles.push(finalize_identities_handle); - // Mine identities task - let mine_identities = MineIdentities::new( - self.database.clone(), - self.identity_manager.clone(), - self.tree_state.get_processed_tree(), - pending_batch_submissions_queue.clone(), - ); - - let mine_identities_handle = crate::utils::spawn_monitored_with_backoff( - move || mine_identities.clone().run(), - shutdown_sender.clone(), - MINE_IDENTITIES_BACKOFF, - ); - - handles.push(mine_identities_handle); - // Process identities task let process_identities = ProcessIdentities::new( self.database.clone(), self.identity_manager.clone(), self.tree_state.get_batching_tree(), self.batch_insert_timeout_secs, - pending_batch_submissions_queue, wake_up_notify.clone(), ); diff --git a/src/task_monitor/tasks/finalize_identities.rs b/src/task_monitor/tasks/finalize_identities.rs index 92837d88..71716e6c 100644 --- a/src/task_monitor/tasks/finalize_identities.rs +++ b/src/task_monitor/tasks/finalize_identities.rs @@ -2,18 +2,20 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use anyhow::Result as AnyhowResult; +use anyhow::{Context, Result as AnyhowResult}; +use chrono::{DateTime, Utc}; use ethers::abi::RawLog; use ethers::contract::EthEvent; use ethers::providers::Middleware; use ethers::types::{Address, Log, Topic, ValueOrArray, U256}; use tracing::{info, instrument}; -use crate::contracts::abi::{BridgedWorldId, RootAddedFilter, TreeChangedFilter}; +use crate::contracts::abi::{BridgedWorldId, RootAddedFilter, TreeChangeKind, TreeChangedFilter}; use crate::contracts::scanner::BlockScanner; use crate::contracts::{IdentityManager, SharedIdentityManager}; use crate::database::Database; use crate::identity_tree::{Canonical, Intermediate, TreeVersion, TreeWithNextVersion}; +use crate::task_monitor::TaskMonitor; pub struct FinalizeRoots { database: Arc, @@ -76,37 +78,26 @@ async fn finalize_roots_loop( let mainnet_address = mainnet_abi.address(); loop { - let all_roots = fetch_logs( - &mut mainnet_scanner, - &mut secondary_scanners, - mainnet_address, - ) - .await?; + let mainnet_logs = fetch_mainnet_logs(&mut mainnet_scanner, mainnet_address).await?; - finalize_roots( - database, - identity_manager, - processed_tree, - finalized_tree, - all_roots, - ) - .await?; + finalize_mainnet_roots(database, identity_manager, processed_tree, &mainnet_logs).await?; + + let mut roots = extract_roots_from_mainnet_logs(mainnet_logs); + roots.extend(fetch_secondary_logs(&mut secondary_scanners).await?); + + finalize_secondary_roots(database, identity_manager, finalized_tree, roots).await?; tokio::time::sleep(time_between_scans).await; } } -#[instrument(level = "info", skip_all)] -async fn fetch_logs( - mainnet_scanner: &mut BlockScanner, - secondary_scanners: &mut HashMap>, +async fn fetch_mainnet_logs( + mainnet_scanner: &mut BlockScanner, mainnet_address: Address, -) -> anyhow::Result> +) -> anyhow::Result> where - A: Middleware, - ::Error: 'static, - B: Middleware, - ::Error: 'static, + M: Middleware, + ::Error: 'static, { let mainnet_topics = [ Some(Topic::from(TreeChangedFilter::signature())), @@ -115,6 +106,22 @@ where None, ]; + let mainnet_address = Some(ValueOrArray::Value(mainnet_address)); + + let mainnet_logs = mainnet_scanner + .next(mainnet_address, mainnet_topics.clone()) + .await?; + + Ok(mainnet_logs) +} + +async fn fetch_secondary_logs( + secondary_scanners: &mut HashMap>, +) -> anyhow::Result> +where + M: Middleware, + ::Error: 'static, +{ let bridged_topics = [ Some(Topic::from(RootAddedFilter::signature())), None, @@ -122,11 +129,6 @@ where None, ]; - let mainnet_address = Some(ValueOrArray::Value(mainnet_address)); - - let mainnet_logs = mainnet_scanner - .next(mainnet_address, mainnet_topics.clone()) - .await?; let mut secondary_logs = vec![]; for (address, scanner) in secondary_scanners { @@ -137,42 +139,74 @@ where secondary_logs.extend(logs); } - let mut roots = extract_root_from_mainnet_logs(&mainnet_logs); - roots.extend(extract_roots_from_secondary_logs(&secondary_logs)); + let roots = extract_roots_from_secondary_logs(&secondary_logs); Ok(roots) } #[instrument(level = "info", skip_all)] -async fn finalize_roots( +async fn finalize_mainnet_roots( database: &Database, identity_manager: &IdentityManager, processed_tree: &TreeVersion, - finalized_tree: &TreeVersion, - all_roots: Vec, + logs: &[Log], ) -> Result<(), anyhow::Error> { - for root in all_roots { - info!(?root, "Finalizing root"); + for log in logs { + let Some(event) = raw_log_to_tree_changed(log) else { + continue; + }; - let is_root_finalized = identity_manager.is_root_mined_multi_chain(root).await?; + let pre_root = event.pre_root; + let post_root = event.post_root; + let kind = TreeChangeKind::from(event.kind); - if is_root_finalized { - // What can sometimes happen is that this finalize roots function is faster - // than the mine_identities task. In which case we'll try to finalize a given - // root, but it's not yet present in the processed tree. - // - // In that case we can safely apply updates to the processed tree as well. - processed_tree.apply_updates_up_to(root.into()); + info!(?pre_root, ?post_root, ?kind, "Mining batch"); - // We also need to run this update to mark the root as processed - // and apply a mined_at timestamp - database.mark_root_as_processed(&root.into()).await?; + // Double check + if !identity_manager.is_root_mined(post_root).await? { + continue; + } - finalized_tree.apply_updates_up_to(root.into()); - database.mark_root_as_mined(&root.into()).await?; + database.mark_root_as_processed(&post_root.into()).await?; - info!(?root, "Root finalized"); + info!(?pre_root, ?post_root, ?kind, "Batch mined"); + + if kind == TreeChangeKind::Deletion { + // NOTE: We must do this before updating the tree + // because we fetch commitments from the processed tree + // before they are deleted + update_eligible_recoveries(database, identity_manager, processed_tree, &log).await?; } + + let updates_count = processed_tree.apply_updates_up_to(post_root.into()); + + info!(updates_count, ?pre_root, ?post_root, "Mined tree updated"); + + TaskMonitor::log_identities_queues(database).await?; + } + + Ok(()) +} + +#[instrument(level = "info", skip_all)] +async fn finalize_secondary_roots( + database: &Database, + identity_manager: &IdentityManager, + finalized_tree: &TreeVersion, + roots: Vec, +) -> Result<(), anyhow::Error> { + for root in roots { + info!(?root, "Finalizing root"); + + // Check if mined on all L2s + if !identity_manager.is_root_mined_multi_chain(root).await? { + continue; + } + + database.mark_root_as_mined(&root.into()).await?; + finalized_tree.apply_updates_up_to(root.into()); + + info!(?root, "Root finalized"); } Ok(()) @@ -200,19 +234,26 @@ where Ok(secondary_scanners) } -fn extract_root_from_mainnet_logs(logs: &[Log]) -> Vec { +fn extract_roots_from_mainnet_logs(mainnet_logs: Vec) -> Vec { let mut roots = vec![]; + for log in mainnet_logs { + let Some(event) = raw_log_to_tree_changed(&log) else { + continue; + }; - for log in logs { - let raw_log = RawLog::from((log.topics.clone(), log.data.to_vec())); - if let Ok(event) = TreeChangedFilter::decode_log(&raw_log) { - roots.push(event.post_root); - } - } + let post_root = event.post_root; + roots.push(post_root); + } roots } +fn raw_log_to_tree_changed(log: &Log) -> Option { + let raw_log = RawLog::from((log.topics.clone(), log.data.to_vec())); + + TreeChangedFilter::decode_log(&raw_log).ok() +} + fn extract_roots_from_secondary_logs(logs: &[Log]) -> Vec { let mut roots = vec![]; @@ -225,3 +266,57 @@ fn extract_roots_from_secondary_logs(logs: &[Log]) -> Vec { roots } + +use crate::identity_tree::Hash; + +async fn update_eligible_recoveries( + database: &Database, + identity_manager: &IdentityManager, + processed_tree: &TreeVersion, + log: &Log, +) -> anyhow::Result<()> { + let tx_hash = log.transaction_hash.context("Missing tx hash")?; + let commitments = identity_manager + .fetch_deletion_indices_from_tx(tx_hash) + .await + .context("Could not fetch deletion indices from tx")?; + + let commitments = processed_tree.commitments_by_indices(&commitments); + let commitments: Vec = commitments.into_iter().map(|c| c.into()).collect(); + + // Check if any deleted commitments correspond with entries in the + // recoveries table and insert the new commitment into the unprocessed + // identities table with the proper eligibility timestamp + let recoveries = database + .get_recoveries() + .await? + .iter() + .map(|f| (f.existing_commitment, f.new_commitment)) + .collect::>(); + + // Fetch the root history expiry time on chain + let root_history_expiry = identity_manager.root_history_expiry().await?; + + // Use the root history expiry to calcuate the eligibility timestamp for the new + // insertion + let eligibility_timestamp = DateTime::from_utc( + chrono::NaiveDateTime::from_timestamp_opt( + Utc::now().timestamp() + root_history_expiry.as_u64() as i64, + 0, + ) + .context("Could not convert eligibility timestamp to NaiveDateTime")?, + Utc, + ); + + // For each deletion, if there is a corresponding recovery, insert a new + // identity with the specified eligibility timestamp + for prev_commitment in commitments { + if let Some(new_commitment) = recoveries.get(&prev_commitment.into()) { + database + .insert_new_identity(*new_commitment, eligibility_timestamp) + .await?; + } + } + + Ok(()) +} diff --git a/src/task_monitor/tasks/mine_identities.rs b/src/task_monitor/tasks/mine_identities.rs deleted file mode 100644 index b4468c96..00000000 --- a/src/task_monitor/tasks/mine_identities.rs +++ /dev/null @@ -1,219 +0,0 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use anyhow::{Context, Result as AnyhowResult}; -use chrono::{DateTime, Utc}; -use tracing::{info, instrument}; - -use crate::contracts::{IdentityManager, SharedIdentityManager}; -use crate::database::Database; -use crate::identity_tree::{Hash, Intermediate, TreeVersion, TreeWithNextVersion}; -use crate::task_monitor::{ - PendingBatchDeletion, PendingBatchInsertion, PendingBatchSubmission, TaskMonitor, -}; -use crate::utils::async_queue::AsyncQueue; - -pub struct MineIdentities { - database: Arc, - identity_manager: SharedIdentityManager, - mined_tree: TreeVersion, - pending_batch_submissions_queue: AsyncQueue, -} - -impl MineIdentities { - pub fn new( - database: Arc, - identity_manager: SharedIdentityManager, - mined_tree: TreeVersion, - pending_batch_submissions_queue: AsyncQueue, - ) -> Arc { - Arc::new(Self { - database, - identity_manager, - mined_tree, - pending_batch_submissions_queue, - }) - } - - pub async fn run(self: Arc) -> anyhow::Result<()> { - mine_identities_loop( - &self.database, - &self.identity_manager, - &self.mined_tree, - &self.pending_batch_submissions_queue, - ) - .await - } -} - -async fn mine_identities_loop( - database: &Database, - identity_manager: &IdentityManager, - mined_tree: &TreeVersion, - pending_batch_submissions_queue: &AsyncQueue, -) -> AnyhowResult<()> { - loop { - let pending_identity = pending_batch_submissions_queue.pop().await; - - match pending_identity.read().await { - PendingBatchSubmission::Insertion(pending_identity_insertion) => { - mine_insertions( - pending_identity_insertion, - database, - identity_manager, - mined_tree, - ) - .await?; - } - PendingBatchSubmission::Deletion(pending_identity_deletion) => { - mine_deletions( - pending_identity_deletion, - database, - identity_manager, - mined_tree, - ) - .await?; - } - } - - pending_identity.commit().await; - } -} - -#[instrument(level = "info", skip_all)] -async fn mine_insertions( - pending_identity: PendingBatchInsertion, - database: &Database, - identity_manager: &IdentityManager, - mined_tree: &TreeVersion, -) -> AnyhowResult<()> { - let PendingBatchInsertion { - transaction_id, - pre_root, - post_root, - start_index, - } = pending_identity; - - info!( - start_index, - ?pre_root, - ?post_root, - ?transaction_id, - "Mining batch" - ); - - if !identity_manager - .mine_identities(transaction_id.clone()) - .await? - { - panic!( - "Transaction {} failed on chain - sequencer will crash and restart", - transaction_id - ); - } - - // With this done, all that remains is to mark them as submitted to the - // blockchain in the source-of-truth database, and also update the mined tree to - // agree with the database and chain. - database.mark_root_as_processed(&post_root.into()).await?; - - info!(start_index, ?pre_root, ?post_root, "Batch mined"); - - let updates_count = mined_tree.apply_updates_up_to(post_root.into()); - - info!( - start_index, - updates_count, - ?pre_root, - ?post_root, - "Mined tree updated" - ); - - TaskMonitor::log_identities_queues(database).await?; - - Ok(()) -} - -#[instrument(level = "info", skip_all)] -async fn mine_deletions( - pending_identity_deletion: PendingBatchDeletion, - database: &Database, - identity_manager: &IdentityManager, - mined_tree: &TreeVersion, -) -> AnyhowResult<()> { - let PendingBatchDeletion { - transaction_id, - pre_root, - post_root, - commitments, - } = pending_identity_deletion; - - info!( - ?pre_root, - ?post_root, - ?transaction_id, - "Mining deletion batch" - ); - - if !identity_manager - .mine_identities(transaction_id.clone()) - .await? - { - panic!( - "Transaction {} failed on chain - sequencer will crash and restart", - transaction_id - ); - } - - // With this done, all that remains is to mark them as submitted to the - // blockchain in the source-of-truth database, and also update the mined tree to - // agree with the database and chain. - database.mark_root_as_processed(&post_root.into()).await?; - - // Update the latest deletion - database.update_latest_deletion(Utc::now()).await?; - - info!(?pre_root, ?post_root, "Deletion batch mined"); - - let updates_count = mined_tree.apply_updates_up_to(post_root.into()); - - info!(updates_count, ?pre_root, ?post_root, "Mined tree updated"); - - // Check if any deleted commitments correspond with entries in the - // recoveries table and insert the new commitment into the unprocessed - // identities table with the proper eligibility timestamp - let recoveries = database - .get_recoveries() - .await? - .iter() - .map(|f| (f.existing_commitment, f.new_commitment)) - .collect::>(); - - // Fetch the root history expiry time on chain - let root_history_expiry = identity_manager.root_history_expiry().await?; - - // Use the root history expiry to calcuate the eligibility timestamp for the new - // insertion - let eligibility_timestamp = DateTime::from_utc( - chrono::NaiveDateTime::from_timestamp_opt( - Utc::now().timestamp() + root_history_expiry.as_u64() as i64, - 0, - ) - .context("Could not convert eligibility timestamp to NaiveDateTime")?, - Utc, - ); - - // For each deletion, if there is a corresponding recovery, insert a new - // identity with the specified eligibility timestamp - for prev_commitment in commitments { - if let Some(new_commitment) = recoveries.get(&prev_commitment.into()) { - database - .insert_new_identity(*new_commitment, eligibility_timestamp) - .await?; - } - } - - TaskMonitor::log_identities_queues(database).await?; - - Ok(()) -} diff --git a/src/task_monitor/tasks/mod.rs b/src/task_monitor/tasks/mod.rs index 8db72a66..b3f3569b 100644 --- a/src/task_monitor/tasks/mod.rs +++ b/src/task_monitor/tasks/mod.rs @@ -1,5 +1,4 @@ pub mod delete_identities; pub mod finalize_identities; pub mod insert_identities; -pub mod mine_identities; pub mod process_identities; diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index f11b0ab4..ecd13b8f 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -3,8 +3,6 @@ use std::time::{Duration, SystemTime}; use anyhow::{Context, Result as AnyhowResult}; use ethers::types::U256; -use once_cell::sync::Lazy; -use prometheus::{register_histogram, Histogram}; use ruint::Uint; use semaphore::merkle_tree::Proof; use semaphore::poseidon_tree::Branch; @@ -19,30 +17,19 @@ use crate::identity_tree::{ }; use crate::prover::identity::Identity; use crate::prover::{Prover, ReadOnlyProver}; -use crate::task_monitor::{ - PendingBatchDeletion, PendingBatchInsertion, PendingBatchSubmission, TaskMonitor, -}; -use crate::utils::async_queue::AsyncQueue; +use crate::task_monitor::TaskMonitor; +use crate::utils::index_packing::pack_indices; /// The number of seconds either side of the timer tick to treat as enough to /// trigger a forced batch insertion. const DEBOUNCE_THRESHOLD_SECS: u64 = 1; -static PENDING_IDENTITIES_CHANNEL_CAPACITY: Lazy = Lazy::new(|| { - register_histogram!( - "pending_identities_channel_capacity", - "Pending identities channel capacity" - ) - .unwrap() -}); - pub struct ProcessIdentities { - database: Arc, - identity_manager: SharedIdentityManager, - batching_tree: TreeVersion, + database: Arc, + identity_manager: SharedIdentityManager, + batching_tree: TreeVersion, batch_insert_timeout_secs: u64, - pending_batch_submissions_queue: AsyncQueue, - wake_up_notify: Arc, + wake_up_notify: Arc, } impl ProcessIdentities { @@ -51,7 +38,6 @@ impl ProcessIdentities { identity_manager: SharedIdentityManager, batching_tree: TreeVersion, batch_insert_timeout_secs: u64, - pending_batch_submissions_queue: AsyncQueue, wake_up_notify: Arc, ) -> Arc { Arc::new(Self { @@ -59,7 +45,6 @@ impl ProcessIdentities { identity_manager, batching_tree, batch_insert_timeout_secs, - pending_batch_submissions_queue, wake_up_notify, }) } @@ -70,7 +55,6 @@ impl ProcessIdentities { &self.identity_manager, &self.batching_tree, &self.wake_up_notify, - &self.pending_batch_submissions_queue, self.batch_insert_timeout_secs, ) .await @@ -82,7 +66,6 @@ async fn process_identities( identity_manager: &IdentityManager, batching_tree: &TreeVersion, wake_up_notify: &Notify, - pending_batch_submissions_queue: &AsyncQueue, timeout_secs: u64, ) -> AnyhowResult<()> { info!("Awaiting for a clean slate"); @@ -127,7 +110,6 @@ async fn process_identities( database, identity_manager, batching_tree, - pending_batch_submissions_queue, &updates, ).await?; @@ -178,7 +160,6 @@ async fn process_identities( database, identity_manager, batching_tree, - pending_batch_submissions_queue, &updates, ).await?; @@ -199,7 +180,6 @@ async fn commit_identities( database: &Database, identity_manager: &IdentityManager, batching_tree: &TreeVersion, - pending_batch_submissions_queue: &AsyncQueue, updates: &[AppliedTreeUpdate], ) -> AnyhowResult<()> { // If the update is an insertion @@ -220,15 +200,7 @@ async fn commit_identities( prover.batch_size() ); - insert_identities( - database, - identity_manager, - batching_tree, - pending_batch_submissions_queue, - updates, - prover, - ) - .await?; + insert_identities(database, identity_manager, batching_tree, updates, prover).await?; } else { let prover = identity_manager .get_suitable_deletion_prover(updates.len()) @@ -240,15 +212,7 @@ async fn commit_identities( prover.batch_size() ); - delete_identities( - database, - identity_manager, - batching_tree, - pending_batch_submissions_queue, - updates, - prover, - ) - .await?; + delete_identities(database, identity_manager, batching_tree, updates, prover).await?; } Ok(()) @@ -259,7 +223,6 @@ pub async fn insert_identities( database: &Database, identity_manager: &IdentityManager, batching_tree: &TreeVersion, - pending_batch_submissions_queue: &AsyncQueue, updates: &[AppliedTreeUpdate], prover: ReadOnlyProver<'_, Prover>, ) -> AnyhowResult<()> { @@ -388,14 +351,6 @@ pub async fn insert_identities( ) .await?; - #[allow(clippy::cast_precision_loss)] - PENDING_IDENTITIES_CHANNEL_CAPACITY.observe(pending_batch_submissions_queue.len().await as f64); - - // This queue's capacity provides us with a natural back-pressure mechanism - // to ensure that we don't overwhelm the identity manager with too many - // identities to mine. - let permit = pending_batch_submissions_queue.reserve().await; - info!( start_index, ?pre_root, @@ -427,13 +382,6 @@ pub async fn insert_identities( "Insertion batch submitted" ); - // The transaction will be awaited on asynchronously - permit - .send(PendingBatchSubmission::Insertion( - PendingBatchInsertion::new(transaction_id, pre_root, post_root, start_index), - )) - .await; - // Update the batching tree only after submitting the identities to the chain batching_tree.apply_updates_up_to(post_root.into()); @@ -448,7 +396,6 @@ pub async fn delete_identities( database: &Database, identity_manager: &IdentityManager, batching_tree: &TreeVersion, - pending_batch_submissions_queue: &AsyncQueue, updates: &[AppliedTreeUpdate], prover: ReadOnlyProver<'_, Prover>, ) -> AnyhowResult<()> { @@ -555,10 +502,7 @@ pub async fn delete_identities( identity_manager.validate_merkle_proofs(&identity_commitments)?; - let mut packed_deletion_indices = vec![]; - for &index in &deletion_indices { - packed_deletion_indices.extend_from_slice(&index.to_be_bytes()); - } + let packed_deletion_indices = pack_indices(&deletion_indices); // We prepare the proof before reserving a slot in the pending identities let proof = IdentityManager::prepare_deletion_proof( @@ -570,14 +514,6 @@ pub async fn delete_identities( ) .await?; - #[allow(clippy::cast_precision_loss)] - PENDING_IDENTITIES_CHANNEL_CAPACITY.observe(pending_batch_submissions_queue.len().await as f64); - - // This queue's capacity provides us with a natural back-pressure mechanism - // to ensure that we don't overwhelm the identity manager with too many - // identities to mine. - let permit = pending_batch_submissions_queue.reserve().await; - info!(?pre_root, ?post_root, "Submitting deletion batch"); // With all the data prepared we can submit the identities to the on-chain @@ -603,16 +539,6 @@ pub async fn delete_identities( "Deletion batch submitted" ); - // The transaction will be awaited on asynchronously - permit - .send(PendingBatchSubmission::Deletion(PendingBatchDeletion::new( - transaction_id, - pre_root, - commitments, - post_root, - ))) - .await; - // Update the batching tree only after submitting the identities to the chain batching_tree.apply_updates_up_to(post_root.into()); diff --git a/src/utils.rs b/src/utils.rs index cd5cb489..0734e973 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -8,7 +8,7 @@ use tokio::sync::broadcast; use tokio::task::JoinHandle; use tracing::{error, info}; -pub mod async_queue; +pub mod index_packing; pub trait Any { fn any(self) -> AnyhowResult; diff --git a/src/utils/async_queue.rs b/src/utils/async_queue.rs deleted file mode 100644 index b87c3605..00000000 --- a/src/utils/async_queue.rs +++ /dev/null @@ -1,314 +0,0 @@ -use std::collections::VecDeque; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -use tokio::sync::{Mutex, Notify}; - -#[derive(Clone)] -pub struct AsyncQueue { - inner: Arc>, -} - -struct AsyncQueueInner { - items: Mutex>, - capacity: usize, - push_notify: Notify, - pop_notify: Notify, - pop_guard_exists: AtomicBool, - push_guard_exists: AtomicBool, -} - -impl AsyncQueue { - pub fn new(capacity: usize) -> Self { - AsyncQueue { - inner: Arc::new(AsyncQueueInner { - capacity, - items: Mutex::new(VecDeque::with_capacity(capacity)), - push_notify: Notify::new(), - pop_notify: Notify::new(), - pop_guard_exists: AtomicBool::new(false), - push_guard_exists: AtomicBool::new(false), - }), - } - } - - pub async fn len(&self) -> usize { - self.inner.items.lock().await.len() - } - - /// Creates a guard that will block any other producer from submitting to - /// this queue - /// - /// Blocks until the queue has space for a new item. - /// - /// If there exists another guard at that time - blocks. - /// - /// Doesn't yet send the item, instead the caller should call - /// `guard.send(value)` to send the item. - pub async fn reserve(&self) -> AsyncPushGuard<'_, T> { - loop { - let items = self.inner.items.lock().await; - - let another_guard_exists = self.inner.push_guard_exists.load(Ordering::SeqCst); - - if !another_guard_exists && items.len() < self.inner.capacity { - self.inner.push_guard_exists.store(true, Ordering::SeqCst); - return AsyncPushGuard { queue: self }; - } - - drop(items); - - // Either could trigger the pop guard to be available - tokio::select! { - () = self.inner.push_notify.notified() => {} - () = self.inner.pop_notify.notified() => {} - } - } - } - - /// Pushes an item to the queue - /// - /// Blocks until the queue has space for a new item - pub async fn _push(&self, item: T) { - loop { - let mut items = self.inner.items.lock().await; - - if items.len() < self.inner.capacity { - items.push_back(item); - - self.inner.push_notify.notify_one(); - - return; - } - - drop(items); - - self.inner.pop_notify.notified(); - } - } - - pub async fn pop(&self) -> AsyncPopGuard { - loop { - let no_other_guards_exist = !self.inner.pop_guard_exists.load(Ordering::SeqCst); - let queue_is_not_empty = self.inner.items.lock().await.front().is_some(); - - if no_other_guards_exist && queue_is_not_empty { - self.inner.pop_guard_exists.store(true, Ordering::SeqCst); - - return AsyncPopGuard { queue: self }; - } - - // Either could trigger the pop guard to be available - tokio::select! { - () = self.inner.push_notify.notified() => {} - () = self.inner.pop_notify.notified() => {} - } - } - } -} - -pub struct AsyncPopGuard<'a, T> { - queue: &'a AsyncQueue, -} - -impl<'a, T> AsyncPopGuard<'a, T> -where - T: Clone, -{ - pub async fn read(&self) -> T { - let items = self.queue.inner.items.lock().await; - items.front().unwrap().clone() - } - - pub async fn commit(self) { - let mut items = self.queue.inner.items.lock().await; - self.queue.inner.pop_notify.notify_one(); - items.pop_front(); - } -} - -impl<'a, T> Drop for AsyncPopGuard<'a, T> { - fn drop(&mut self) { - self.queue - .inner - .pop_guard_exists - .store(false, Ordering::Relaxed); - } -} - -pub struct AsyncPushGuard<'a, T> { - queue: &'a AsyncQueue, -} - -impl<'a, T> AsyncPushGuard<'a, T> { - pub async fn send(self, value: T) { - let mut items = self.queue.inner.items.lock().await; - items.push_back(value); - self.queue.inner.push_notify.notify_one(); - } -} - -impl<'a, T> Drop for AsyncPushGuard<'a, T> { - fn drop(&mut self) { - self.queue - .inner - .push_guard_exists - .store(false, Ordering::Relaxed); - } -} - -#[cfg(test)] -mod tests { - use tokio::time::{timeout, Duration}; - - use super::*; - - #[tokio::test] - async fn pop_on_empty_queue() { - let queue: AsyncQueue = AsyncQueue::new(2); - - let pop_guard = timeout(Duration::from_secs_f32(0.5), queue.pop()).await; - - assert!(pop_guard.is_err(), "Pop on empty queue should timeout"); - } - - #[tokio::test] - async fn read_and_commit_single_item() { - let queue: AsyncQueue = AsyncQueue::new(2); - - queue._push(1).await; - - let pop_guard = queue.pop().await; - - queue._push(2).await; - - assert_eq!(pop_guard.read().await, 1); - - pop_guard.commit().await; - - let pop_guard = queue.pop().await; - - assert_eq!(pop_guard.read().await, 2); - } - - #[tokio::test] - async fn drop_without_commit_does_not_remove_item() { - let queue: AsyncQueue = AsyncQueue::new(2); - - queue._push(1).await; - - let pop_guard = queue.pop().await; - - queue._push(2).await; - - assert_eq!(pop_guard.read().await, 1); - - // Drop without committing - drop(pop_guard); - - let pop_guard = queue.pop().await; - assert_eq!(pop_guard.read().await, 1); - } - - #[tokio::test] - async fn only_a_single_pop_guard_can_exist() { - let queue: AsyncQueue = AsyncQueue::new(2); - - queue._push(1).await; - - let first_guard = queue.pop().await; - assert_eq!(first_guard.read().await, 1); - - let second_guard = timeout(Duration::from_secs_f32(0.5), queue.pop()).await; - - assert!(second_guard.is_err(), "Pop on empty queue should timeout"); - - drop(first_guard); - - let pop_guard = queue.pop().await; - assert_eq!(pop_guard.read().await, 1); - } - - #[tokio::test] - async fn pushing_over_capacity_blocks() { - let queue: AsyncQueue = AsyncQueue::new(2); - - queue._push(1).await; - queue._push(2).await; - - let result = timeout(Duration::from_secs_f32(0.5), queue._push(3)).await; - - assert!(result.is_err(), "Push on full queue should timeout"); - } - - #[tokio::test] - async fn reserve_blocks_until_queue_has_space() { - let queue: AsyncQueue = AsyncQueue::new(2); - - queue._push(1).await; - queue._push(2).await; - - let reserve_guard = timeout(Duration::from_secs_f32(0.5), queue.reserve()).await; - assert!( - reserve_guard.is_err(), - "Reserve on full queue should timeout" - ); - - queue.pop().await.commit().await; - - let reserve_guard = queue.reserve().await; - reserve_guard.send(3).await; - - let pop_guard = queue.pop().await; - assert_eq!(pop_guard.read().await, 2); - - pop_guard.commit().await; - - let pop_guard = queue.pop().await; - assert_eq!(pop_guard.read().await, 3); - } - - #[tokio::test] - async fn queue_is_fifo() { - let queue: AsyncQueue = AsyncQueue::new(2); - - queue._push(1).await; - queue._push(2).await; - - let pop_guard = queue.pop().await; - assert_eq!(pop_guard.read().await, 1); - pop_guard.commit().await; - - let pop_guard = queue.pop().await; - assert_eq!(pop_guard.read().await, 2); - } - - #[tokio::test] - async fn only_one_push_guard_can_exist() { - let queue: AsyncQueue = AsyncQueue::new(2); - - let push_guard = queue.reserve().await; - - let result = timeout(Duration::from_secs_f32(0.5), queue.reserve()).await; - - assert!( - result.is_err(), - "Reserve when another guard exists should timeout" - ); - - drop(push_guard); - - let push_guard = queue.reserve().await; - push_guard.send(1).await; - - let push_guard = queue.reserve().await; - push_guard.send(2).await; - - let pop_guard = queue.pop().await; - assert_eq!(pop_guard.read().await, 1); - pop_guard.commit().await; - - let pop_guard = queue.pop().await; - assert_eq!(pop_guard.read().await, 2); - } -} diff --git a/src/utils/index_packing.rs b/src/utils/index_packing.rs new file mode 100644 index 00000000..7873a40b --- /dev/null +++ b/src/utils/index_packing.rs @@ -0,0 +1,39 @@ +pub fn pack_indices(indices: &[u32]) -> Vec { + let mut packed = Vec::with_capacity(indices.len() * 4); + + for index in indices { + packed.extend_from_slice(&index.to_be_bytes()); + } + + packed +} + +pub fn unpack_indices(packed: &[u8]) -> Vec { + let mut indices = Vec::with_capacity(packed.len() / 4); + + for packed_index in packed.chunks_exact(4) { + let index = u32::from_be_bytes(packed_index.try_into().expect("Invalid index length")); + + indices.push(index); + } + + indices +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pack_indices() { + let indices = vec![1, 2, 3, 4, 5, 6, 7, 8]; + + let packed = pack_indices(&indices); + + assert_eq!(packed.len(), 32); + + let unpacked = unpack_indices(&packed); + + assert_eq!(unpacked, indices); + } +}