diff --git a/schemas/database/013_batches_and_transactions.sql b/schemas/database/013_batches_and_transactions.sql deleted file mode 100644 index 7dc91e43..00000000 --- a/schemas/database/013_batches_and_transactions.sql +++ /dev/null @@ -1,24 +0,0 @@ --- Create ENUM for prover type -CREATE TABLE batches -( - id BIGSERIAL UNIQUE PRIMARY KEY, - next_root BYTEA NOT NULL UNIQUE, - prev_root BYTEA UNIQUE, - created_at TIMESTAMPTZ NOT NULL, - batch_type VARCHAR(50) NOT NULL, - data JSON NOT NULL, - - FOREIGN KEY (prev_root) REFERENCES batches (next_root) -); - -CREATE INDEX idx_batches_prev_root ON batches (prev_root); -CREATE UNIQUE INDEX i_single_null_prev_root ON batches ((batches.prev_root IS NULL)) WHERE batches.prev_root IS NULL; - -CREATE TABLE transactions -( - transaction_id VARCHAR(256) NOT NULL UNIQUE PRIMARY KEY, - batch_next_root BYTEA NOT NULL UNIQUE, - created_at TIMESTAMPTZ NOT NULL, - - FOREIGN KEY (batch_next_root) REFERENCES batches (next_root) -); \ No newline at end of file diff --git a/src/app.rs b/src/app.rs index 2085a414..be1fcd15 100644 --- a/src/app.rs +++ b/src/app.rs @@ -15,7 +15,7 @@ use crate::database::{Database, DatabaseExt as _}; use crate::ethereum::Ethereum; use crate::identity_tree::{ CanonicalTreeBuilder, Hash, InclusionProof, ProcessedStatus, RootItem, Status, TreeState, - TreeUpdate, TreeVersionReadOps, TreeWithNextVersion, UnprocessedStatus, + TreeUpdate, TreeVersionReadOps, UnprocessedStatus, }; use crate::prover::map::initialize_prover_maps; use crate::prover::{ProverConfig, ProverType}; @@ -261,9 +261,6 @@ impl App { let (processed, batching_builder) = processed_builder.seal_and_continue(); let (batching, mut latest_builder) = batching_builder.seal_and_continue(); - - // We are duplicating updates here for some commitments that were batched but - // this is an idempotent operation let pending_items = self .database .get_commitments_by_status(ProcessedStatus::Pending) @@ -272,15 +269,6 @@ impl App { latest_builder.update(&update); } let latest = latest_builder.seal(); - - let batch = self.database.get_latest_batch().await?; - if let Some(batch) = batch { - if batching.get_root() != batch.next_root { - batching.apply_updates_up_to(batch.next_root); - } - assert_eq!(batching.get_root(), batch.next_root); - } - Ok(Some(TreeState::new(mined, processed, batching, latest))) } @@ -367,14 +355,6 @@ impl App { let latest = latest_builder.seal(); - let batch = self.database.get_latest_batch().await?; - if let Some(batch) = batch { - if batching.get_root() != batch.next_root { - batching.apply_updates_up_to(batch.next_root); - } - assert_eq!(batching.get_root(), batch.next_root); - } - Ok(TreeState::new(mined, processed, batching, latest)) } diff --git a/src/database/mod.rs b/src/database/mod.rs index 37b22aa4..81cf9a9c 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -19,11 +19,9 @@ use tracing::{error, info, instrument, warn}; use self::types::{CommitmentHistoryEntry, DeletionEntry, LatestDeletionEntry, RecoveryEntry}; use crate::config::DatabaseConfig; -use crate::database::types::{BatchEntry, BatchEntryData, BatchType, TransactionEntry}; use crate::identity_tree::{ Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, }; -use crate::prover::identity::Identity; use crate::prover::{ProverConfig, ProverType}; pub mod types; @@ -842,241 +840,6 @@ pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { let row_unprocessed = self.fetch_one(query_queued_deletion).await?; Ok(row_unprocessed.get::(0)) } - - async fn insert_new_batch_head(self, next_root: &Hash) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO batches( - id, - next_root, - prev_root, - created_at, - batch_type, - data - ) VALUES (DEFAULT, $1, NULL, CURRENT_TIMESTAMP, $2, $3) - "#, - ) - .bind(next_root) - .bind(BatchType::Insertion) - .bind(sqlx::types::Json::from(BatchEntryData { - identities: vec![], - indexes: vec![], - })); - - self.execute(query).await?; - Ok(()) - } - - async fn insert_new_batch( - self, - next_root: &Hash, - prev_root: &Hash, - batch_type: BatchType, - identities: &[Identity], - indexes: &[usize], - ) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO batches( - id, - next_root, - prev_root, - created_at, - batch_type, - data - ) VALUES (DEFAULT, $1, $2, CURRENT_TIMESTAMP, $3, $4) - "#, - ) - .bind(next_root) - .bind(prev_root) - .bind(batch_type) - .bind(sqlx::types::Json::from(BatchEntryData { - identities: identities.to_vec(), - indexes: indexes.to_vec(), - })); - - self.execute(query).await?; - Ok(()) - } - - async fn get_next_batch(self, prev_root: &Hash) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - id, - next_root, - prev_root, - created_at, - batch_type, - data - FROM batches WHERE prev_root = $1 - LIMIT 1 - "#, - ) - .bind(prev_root) - .fetch_optional(self) - .await?; - - Ok(res) - } - - async fn get_latest_batch(self) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - id, - next_root, - prev_root, - created_at, - batch_type, - data - FROM batches - ORDER BY id DESC - LIMIT 1 - "#, - ) - .fetch_optional(self) - .await?; - - Ok(res) - } - - async fn get_latest_batch_with_transaction(self) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - batches.id, - batches.next_root, - batches.prev_root, - batches.created_at, - batches.batch_type, - batches.data - FROM batches - LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root - WHERE transactions.batch_next_root IS NOT NULL AND batches.prev_root IS NOT NULL - ORDER BY batches.id DESC - LIMIT 1 - "#, - ) - .fetch_optional(self) - .await?; - - Ok(res) - } - - async fn get_next_batch_without_transaction(self) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - batches.id, - batches.next_root, - batches.prev_root, - batches.created_at, - batches.batch_type, - batches.data - FROM batches - LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root - WHERE transactions.batch_next_root IS NULL AND batches.prev_root IS NOT NULL - ORDER BY batches.id ASC - LIMIT 1 - "#, - ) - .fetch_optional(self) - .await?; - - Ok(res) - } - - async fn get_batch_head(self) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - id, - next_root, - prev_root, - created_at, - batch_type, - data - FROM batches WHERE prev_root IS NULL - LIMIT 1 - "#, - ) - .fetch_optional(self) - .await?; - - Ok(res) - } - - async fn get_all_batches_after(self, id: i64) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - id, - next_root, - prev_root, - created_at, - batch_type, - data - FROM batches WHERE id >= $1 ORDER BY id ASC - "#, - ) - .bind(id) - .fetch_all(self) - .await?; - - Ok(res) - } - - async fn root_in_batch_chain(self, root: &Hash) -> Result { - let query = sqlx::query( - r#"SELECT exists(SELECT 1 FROM batches where prev_root = $1 OR next_root = $1)"#, - ) - .bind(root); - let row_unprocessed = self.fetch_one(query).await?; - Ok(row_unprocessed.get::(0)) - } - - async fn insert_new_transaction( - self, - transaction_id: &String, - batch_next_root: &Hash, - ) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO transactions( - transaction_id, - batch_next_root, - created_at - ) VALUES ($1, $2, CURRENT_TIMESTAMP) - "#, - ) - .bind(transaction_id) - .bind(batch_next_root); - - self.execute(query).await?; - Ok(()) - } - - async fn get_transaction_for_batch( - self, - next_root: &Hash, - ) -> Result, Error> { - let res = sqlx::query_as::<_, TransactionEntry>( - r#" - SELECT - transaction_id, - batch_next_root, - created_at - FROM transactions WHERE batch_next_root = $1 - LIMIT 1 - "#, - ) - .bind(next_root) - .fetch_optional(self) - .await?; - - Ok(res) - } } #[derive(Debug, Error)] @@ -1104,10 +867,8 @@ mod test { use super::Database; use crate::config::DatabaseConfig; - use crate::database::types::BatchType; use crate::database::DatabaseExt as _; use crate::identity_tree::{Hash, ProcessedStatus, Status, UnprocessedStatus}; - use crate::prover::identity::Identity; use crate::prover::{ProverConfig, ProverType}; use crate::utils::secret::SecretUrl; @@ -2277,166 +2038,4 @@ mod test { Ok(()) } - - #[tokio::test] - async fn test_insert_batch() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let identities: Vec<_> = mock_identities(10) - .iter() - .map(|commitment| { - Identity::new( - (*commitment).into(), - mock_roots(10).iter().map(|root| (*root).into()).collect(), - ) - }) - .collect(); - let roots = mock_roots(2); - - db.insert_new_batch_head(&roots[0]).await?; - db.insert_new_batch(&roots[1], &roots[0], BatchType::Insertion, &identities, &[ - 0, - ]) - .await?; - - Ok(()) - } - - #[tokio::test] - async fn test_get_next_batch() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let identities: Vec<_> = mock_identities(10) - .iter() - .map(|commitment| { - Identity::new( - (*commitment).into(), - mock_roots(10).iter().map(|root| (*root).into()).collect(), - ) - }) - .collect(); - let indexes = vec![0]; - let roots = mock_roots(2); - - db.insert_new_batch_head(&roots[0]).await?; - db.insert_new_batch( - &roots[1], - &roots[0], - BatchType::Insertion, - &identities, - &indexes, - ) - .await?; - - let next_batch = db.get_next_batch(&roots[0]).await?; - - assert!(next_batch.is_some()); - - let next_batch = next_batch.unwrap(); - - assert_eq!(next_batch.prev_root.unwrap(), roots[0]); - assert_eq!(next_batch.next_root, roots[1]); - assert_eq!(next_batch.data.0.identities, identities); - assert_eq!(next_batch.data.0.indexes, indexes); - - let next_batch = db.get_next_batch(&roots[1]).await?; - - assert!(next_batch.is_none()); - - Ok(()) - } - - #[tokio::test] - async fn test_get_next_batch_without_transaction() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let identities: Vec<_> = mock_identities(10) - .iter() - .map(|commitment| { - Identity::new( - (*commitment).into(), - mock_roots(10).iter().map(|root| (*root).into()).collect(), - ) - }) - .collect(); - let indexes = vec![0]; - let roots = mock_roots(2); - let transaction_id = String::from("173bcbfd-e1d9-40e2-ba10-fc1dfbf742c9"); - - db.insert_new_batch_head(&roots[0]).await?; - db.insert_new_batch( - &roots[1], - &roots[0], - BatchType::Insertion, - &identities, - &indexes, - ) - .await?; - - let next_batch = db.get_next_batch_without_transaction().await?; - - assert!(next_batch.is_some()); - - let next_batch = next_batch.unwrap(); - - assert_eq!(next_batch.prev_root.unwrap(), roots[0]); - assert_eq!(next_batch.next_root, roots[1]); - assert_eq!(next_batch.data.0.identities, identities); - assert_eq!(next_batch.data.0.indexes, indexes); - - db.insert_new_transaction(&transaction_id, &roots[1]) - .await?; - - let next_batch = db.get_next_batch_without_transaction().await?; - - assert!(next_batch.is_none()); - - Ok(()) - } - - #[tokio::test] - async fn test_get_batch_head() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let roots = mock_roots(1); - - let batch_head = db.get_batch_head().await?; - - assert!(batch_head.is_none()); - - db.insert_new_batch_head(&roots[0]).await?; - - let batch_head = db.get_batch_head().await?; - - assert!(batch_head.is_some()); - let batch_head = batch_head.unwrap(); - - assert_eq!(batch_head.prev_root, None); - assert_eq!(batch_head.next_root, roots[0]); - assert!( - batch_head.data.0.identities.is_empty(), - "Should have empty identities." - ); - assert!( - batch_head.data.0.indexes.is_empty(), - "Should have empty indexes." - ); - - Ok(()) - } - - #[tokio::test] - async fn test_insert_transaction() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let roots = mock_roots(1); - let transaction_id = String::from("173bcbfd-e1d9-40e2-ba10-fc1dfbf742c9"); - - db.insert_new_batch_head(&roots[0]).await?; - - db.insert_new_transaction(&transaction_id, &roots[0]) - .await?; - - Ok(()) - } } diff --git a/src/database/types.rs b/src/database/types.rs index 08239ef7..71b596b2 100644 --- a/src/database/types.rs +++ b/src/database/types.rs @@ -1,13 +1,7 @@ use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use sqlx::database::{HasArguments, HasValueRef}; -use sqlx::encode::IsNull; -use sqlx::error::BoxDynError; use sqlx::prelude::FromRow; -use sqlx::{Decode, Encode, Postgres, Type}; use crate::identity_tree::{Hash, Status, UnprocessedStatus}; -use crate::prover::identity::Identity; pub struct UnprocessedCommitment { pub commitment: Hash, @@ -43,137 +37,3 @@ pub struct CommitmentHistoryEntry { pub held_back: bool, pub status: Status, } - -#[derive(Debug, Copy, Clone, sqlx::Type, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] -#[serde(rename_all = "camelCase")] -#[sqlx(type_name = "VARCHAR", rename_all = "PascalCase")] -pub enum BatchType { - #[default] - Insertion, - Deletion, -} - -impl From for BatchType { - fn from(s: String) -> Self { - match s.as_str() { - "Insertion" => BatchType::Insertion, - "Deletion" => BatchType::Deletion, - _ => BatchType::Insertion, - } - } -} - -impl std::fmt::Display for BatchType { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - BatchType::Insertion => write!(f, "insertion"), - BatchType::Deletion => write!(f, "deletion"), - } - } -} - -#[derive(Debug, Clone, FromRow)] -pub struct BatchEntry { - pub id: i64, - pub next_root: Hash, - // In general prev_root is present all the time except the first row (head of the batches - // chain) - pub prev_root: Option, - pub created_at: DateTime, - pub batch_type: BatchType, - pub data: sqlx::types::Json, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct BatchEntryData { - pub identities: Vec, - pub indexes: Vec, -} - -#[derive(Debug, Clone, FromRow)] -pub struct TransactionEntry { - pub batch_next_root: Hash, - pub transaction_id: String, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Commitments(pub Vec); - -impl Encode<'_, Postgres> for Commitments { - fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> IsNull { - let commitments = &self - .0 - .iter() - .map(|c| c.to_be_bytes()) // Why be not le? - .collect::>(); - - <&Vec<[u8; 32]> as Encode>::encode(commitments, buf) - } -} - -impl Decode<'_, Postgres> for Commitments { - fn decode(value: >::ValueRef) -> Result { - let value = as Decode>::decode(value)?; - - let res = value.iter().map(|&v| Hash::from_be_bytes(v)).collect(); - - Ok(Commitments(res)) - } -} - -impl Type for Commitments { - fn type_info() -> ::TypeInfo { - <&Vec<&[u8]> as Type>::type_info() - } - - fn compatible(ty: &::TypeInfo) -> bool { - <&Vec<&[u8]> as Type>::compatible(ty) - } -} - -impl From> for Commitments { - fn from(value: Vec) -> Self { - Commitments(value) - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct LeafIndexes(pub Vec); - -impl Encode<'_, Postgres> for LeafIndexes { - fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> IsNull { - let commitments = &self - .0 - .iter() - .map(|&c| c as i64) // Why be not le? - .collect(); - - <&Vec as Encode>::encode(commitments, buf) - } -} - -impl Decode<'_, Postgres> for LeafIndexes { - fn decode(value: >::ValueRef) -> Result { - let value = as Decode>::decode(value)?; - - let res = value.iter().map(|&v| v as usize).collect(); - - Ok(LeafIndexes(res)) - } -} - -impl Type for LeafIndexes { - fn type_info() -> ::TypeInfo { - <&Vec as Type>::type_info() - } - - fn compatible(ty: &::TypeInfo) -> bool { - <&Vec as Type>::compatible(ty) - } -} - -impl From> for LeafIndexes { - fn from(value: Vec) -> Self { - LeafIndexes(value) - } -} diff --git a/src/ethereum/mod.rs b/src/ethereum/mod.rs index 8c4d5871..e960e5b7 100644 --- a/src/ethereum/mod.rs +++ b/src/ethereum/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; +use ethers::providers::Middleware; use ethers::types::transaction::eip2718::TypedTransaction; use ethers::types::Address; pub use read::ReadProvider; diff --git a/src/prover/identity.rs b/src/prover/identity.rs index 4c2d5f00..c0a56b3b 100644 --- a/src/prover/identity.rs +++ b/src/prover/identity.rs @@ -1,9 +1,8 @@ use ethers::types::U256; -use serde::{Deserialize, Serialize}; /// A representation of an identity insertion into the merkle tree as used for /// the prover endpoint. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Identity { /// The identity commitment value that is inserted into the merkle tree. pub commitment: U256, diff --git a/src/task_monitor.rs b/src/task_monitor.rs index 058c308d..1b4ec2ef 100644 --- a/src/task_monitor.rs +++ b/src/task_monitor.rs @@ -130,37 +130,14 @@ impl TaskMonitor { ); handles.push(finalize_identities_handle); - let base_next_batch_notify = Arc::new(Notify::new()); - - // Create batches - let app = self.app.clone(); - let next_batch_notify = base_next_batch_notify.clone(); - let wake_up_notify = base_wake_up_notify.clone(); - - let create_batches = move || { - tasks::create_batches::create_batches( - app.clone(), - next_batch_notify.clone(), - wake_up_notify.clone(), - ) - }; - let create_batches_handle = crate::utils::spawn_monitored_with_backoff( - create_batches, - shutdown_sender.clone(), - PROCESS_IDENTITIES_BACKOFF, - ); - handles.push(create_batches_handle); - - // Process batches + // Process identities let app = self.app.clone(); - let next_batch_notify = base_next_batch_notify.clone(); let wake_up_notify = base_wake_up_notify.clone(); let process_identities = move || { - tasks::process_batches::process_batches( + tasks::process_identities::process_identities( app.clone(), monitored_txs_sender.clone(), - next_batch_notify.clone(), wake_up_notify.clone(), ) }; diff --git a/src/task_monitor/tasks/mod.rs b/src/task_monitor/tasks/mod.rs index c5f0b77e..c4ffb8f0 100644 --- a/src/task_monitor/tasks/mod.rs +++ b/src/task_monitor/tasks/mod.rs @@ -1,6 +1,5 @@ -pub mod create_batches; pub mod delete_identities; pub mod finalize_identities; pub mod insert_identities; pub mod monitor_txs; -pub mod process_batches; +pub mod process_identities; diff --git a/src/task_monitor/tasks/process_batches.rs b/src/task_monitor/tasks/process_batches.rs deleted file mode 100644 index 6a4bc411..00000000 --- a/src/task_monitor/tasks/process_batches.rs +++ /dev/null @@ -1,203 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use ethers::types::U256; -use tokio::sync::{mpsc, Notify}; -use tokio::{select, time}; -use tracing::instrument; - -use crate::app::App; -use crate::contracts::IdentityManager; -use crate::database::types::{BatchEntry, BatchType}; -use crate::database::DatabaseExt as _; -use crate::ethereum::write::TransactionId; -use crate::prover::Prover; -use crate::utils::index_packing::pack_indices; - -pub async fn process_batches( - app: Arc, - monitored_txs_sender: Arc>, - next_batch_notify: Arc, - wake_up_notify: Arc, -) -> anyhow::Result<()> { - tracing::info!("Awaiting for a clean slate"); - app.identity_manager.await_clean_slate().await?; - - tracing::info!("Starting identity processor."); - - // We start a timer and force it to perform one initial tick to avoid an - // immediate trigger. - let mut timer = time::interval(Duration::from_secs(5)); - - loop { - // We wait either for a timer tick or a full batch - select! { - _ = timer.tick() => { - tracing::info!("Identity batch insertion woken due to timeout"); - } - - () = next_batch_notify.notified() => { - tracing::trace!("Identity batch insertion woken due to next batch creation"); - }, - - () = wake_up_notify.notified() => { - tracing::trace!("Identity batch insertion woken due to request"); - }, - } - - let next_batch = app.database.get_next_batch_without_transaction().await?; - let Some(next_batch) = next_batch else { - continue; - }; - - let tx_id = - commit_identities(&app.identity_manager, &monitored_txs_sender, &next_batch).await?; - - if let Some(tx_id) = tx_id { - app.database - .insert_new_transaction(&tx_id.0, &next_batch.next_root) - .await?; - } - - // We want to check if there's a full batch available immediately - wake_up_notify.notify_one(); - } -} - -async fn commit_identities( - identity_manager: &IdentityManager, - monitored_txs_sender: &mpsc::Sender, - batch: &BatchEntry, -) -> anyhow::Result> { - // If the update is an insertion - let tx_id = if batch.batch_type == BatchType::Insertion { - let prover = identity_manager - .get_suitable_insertion_prover(batch.data.0.identities.len()) - .await?; - - tracing::info!( - num_updates = batch.data.0.identities.len(), - batch_size = prover.batch_size(), - "Insertion batch", - ); - - insert_identities(identity_manager, &prover, batch).await? - } else { - let prover = identity_manager - .get_suitable_deletion_prover(batch.data.0.identities.len()) - .await?; - - tracing::info!( - num_updates = batch.data.0.identities.len(), - batch_size = prover.batch_size(), - "Deletion batch" - ); - - delete_identities(identity_manager, &prover, batch).await? - }; - - if let Some(tx_id) = tx_id.clone() { - monitored_txs_sender.send(tx_id).await?; - } - - Ok(tx_id) -} - -#[instrument(level = "info", skip_all)] -pub async fn insert_identities( - identity_manager: &IdentityManager, - prover: &Prover, - batch: &BatchEntry, -) -> anyhow::Result> { - identity_manager.validate_merkle_proofs(&batch.data.0.identities)?; - let start_index = *batch.data.0.indexes.first().expect("Should exist."); - let pre_root: U256 = batch.prev_root.expect("Should exist.").into(); - let post_root: U256 = batch.next_root.into(); - - // We prepare the proof before reserving a slot in the pending identities - let proof = IdentityManager::prepare_insertion_proof( - prover, - start_index, - pre_root, - &batch.data.0.identities, - post_root, - ) - .await?; - - tracing::info!( - start_index, - ?pre_root, - ?post_root, - "Submitting insertion batch" - ); - - // With all the data prepared we can submit the identities to the on-chain - // identity manager and wait for that transaction to be mined. - let transaction_id = identity_manager - .register_identities( - start_index, - pre_root, - post_root, - batch.data.0.identities.clone(), - proof, - ) - .await - .map_err(|e| { - tracing::error!(?e, "Failed to insert identity to contract."); - e - })?; - - tracing::info!( - start_index, - ?pre_root, - ?post_root, - ?transaction_id, - "Insertion batch submitted" - ); - - Ok(Some(transaction_id)) -} - -pub async fn delete_identities( - identity_manager: &IdentityManager, - prover: &Prover, - batch: &BatchEntry, -) -> anyhow::Result> { - identity_manager.validate_merkle_proofs(&batch.data.0.identities)?; - let pre_root: U256 = batch.prev_root.expect("Should exist.").into(); - let post_root: U256 = batch.next_root.into(); - let deletion_indices: Vec<_> = batch.data.0.indexes.iter().map(|&v| v as u32).collect(); - - // We prepare the proof before reserving a slot in the pending identities - let proof = IdentityManager::prepare_deletion_proof( - prover, - pre_root, - deletion_indices.clone(), - batch.data.0.identities.clone(), - post_root, - ) - .await?; - - let packed_deletion_indices = pack_indices(&deletion_indices); - - tracing::info!(?pre_root, ?post_root, "Submitting deletion batch"); - - // With all the data prepared we can submit the identities to the on-chain - // identity manager and wait for that transaction to be mined. - let transaction_id = identity_manager - .delete_identities(proof, packed_deletion_indices, pre_root, post_root) - .await - .map_err(|e| { - tracing::error!(?e, "Failed to insert identity to contract."); - e - })?; - - tracing::info!( - ?pre_root, - ?post_root, - ?transaction_id, - "Deletion batch submitted" - ); - - Ok(Some(transaction_id)) -} diff --git a/src/task_monitor/tasks/create_batches.rs b/src/task_monitor/tasks/process_identities.rs similarity index 77% rename from src/task_monitor/tasks/create_batches.rs rename to src/task_monitor/tasks/process_identities.rs index 64c9d297..6c5c8bcb 100644 --- a/src/task_monitor/tasks/create_batches.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -2,18 +2,18 @@ use std::sync::Arc; use anyhow::Context; use chrono::{DateTime, Utc}; -use ethers::prelude::U256; +use ethers::types::U256; use ruint::Uint; use semaphore::merkle_tree::Proof; use semaphore::poseidon_tree::{Branch, PoseidonHash}; -use tokio::sync::Notify; +use tokio::sync::{mpsc, Notify}; use tokio::{select, time}; use tracing::instrument; use crate::app::App; use crate::contracts::IdentityManager; -use crate::database; -use crate::database::{Database, DatabaseExt as _}; +use crate::database::DatabaseExt as _; +use crate::ethereum::write::TransactionId; use crate::identity_tree::{ AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, }; @@ -21,21 +21,21 @@ use crate::prover::identity::Identity; use crate::prover::Prover; use crate::task_monitor::TaskMonitor; use crate::utils::batch_type::BatchType; +use crate::utils::index_packing::pack_indices; /// The number of seconds either side of the timer tick to treat as enough to /// trigger a forced batch insertion. const DEBOUNCE_THRESHOLD_SECS: i64 = 1; -pub async fn create_batches( +pub async fn process_identities( app: Arc, - next_batch_notify: Arc, + monitored_txs_sender: Arc>, wake_up_notify: Arc, ) -> anyhow::Result<()> { tracing::info!("Awaiting for a clean slate"); app.identity_manager.await_clean_slate().await?; - tracing::info!("Starting batch creator."); - ensure_batch_chain_initialized(&app).await?; + tracing::info!("Starting identity processor."); // We start a timer and force it to perform one initial tick to avoid an // immediate trigger. @@ -83,18 +83,12 @@ pub async fn create_batches( .batching_tree() .peek_next_updates(batch_size); - if updates.is_empty() { - tracing::trace!("No updates found. Waiting."); - continue; - } - // If the batch is a deletion, process immediately without resetting the timer if batch_type.is_deletion() { commit_identities( - &app.database, &app.identity_manager, app.tree_state()?.batching_tree(), - &next_batch_notify, + &monitored_txs_sender, &updates, ) .await?; @@ -113,10 +107,9 @@ pub async fn create_batches( // process the batch if updates.len() >= batch_size || batch_time_elapsed { commit_identities( - &app.database, &app.identity_manager, app.tree_state()?.batching_tree(), - &next_batch_notify, + &monitored_txs_sender, &updates, ) .await?; @@ -149,10 +142,9 @@ pub async fn create_batches( // If the next batch is deletion, process the current insertion batch if next_batch_is_deletion { commit_identities( - &app.database, &app.identity_manager, app.tree_state()?.batching_tree(), - &next_batch_notify, + &monitored_txs_sender, &updates, ) .await?; @@ -174,25 +166,14 @@ pub async fn create_batches( } } -async fn ensure_batch_chain_initialized(app: &Arc) -> anyhow::Result<()> { - let batch_head = app.database.get_batch_head().await?; - if batch_head.is_none() { - app.database - .insert_new_batch_head(&app.tree_state()?.batching_tree().get_root()) - .await?; - } - Ok(()) -} - async fn commit_identities( - database: &Database, identity_manager: &IdentityManager, batching_tree: &TreeVersion, - next_batch_notify: &Arc, + monitored_txs_sender: &mpsc::Sender, updates: &[AppliedTreeUpdate], ) -> anyhow::Result<()> { // If the update is an insertion - if updates + let tx_id = if updates .first() .context("Updates should be > 1")? .update @@ -209,7 +190,7 @@ async fn commit_identities( "Insertion batch", ); - insert_identities(database, batching_tree, next_batch_notify, updates, &prover).await + insert_identities(identity_manager, batching_tree, updates, &prover).await? } else { let prover = identity_manager .get_suitable_deletion_prover(updates.len()) @@ -221,22 +202,27 @@ async fn commit_identities( "Deletion batch" ); - delete_identities(database, batching_tree, next_batch_notify, updates, &prover).await + delete_identities(identity_manager, batching_tree, updates, &prover).await? + }; + + if let Some(tx_id) = tx_id { + monitored_txs_sender.send(tx_id).await?; } + + Ok(()) } #[instrument(level = "info", skip_all)] pub async fn insert_identities( - database: &Database, + identity_manager: &IdentityManager, batching_tree: &TreeVersion, - next_batch_notify: &Arc, updates: &[AppliedTreeUpdate], prover: &Prover, -) -> anyhow::Result<()> { +) -> anyhow::Result> { assert_updates_are_consecutive(updates); - let pre_root = batching_tree.get_root(); - let mut insertion_indices: Vec<_> = updates.iter().map(|f| f.update.leaf_index).collect(); + let start_index = updates[0].update.leaf_index; + let pre_root: U256 = batching_tree.get_root().into(); let mut commitments: Vec = updates .iter() .map(|update| update.update.element.into()) @@ -285,7 +271,6 @@ pub async fn insert_identities( for i in start_index..(start_index + padding) { let proof = latest_tree_from_updates.proof(i); merkle_proofs.push(proof); - insertion_indices.push(i); } } @@ -302,42 +287,60 @@ pub async fn insert_identities( // With the updates applied we can grab the value of the tree's new root and // build our identities for sending to the identity manager. - let post_root = latest_tree_from_updates.root(); + let post_root: U256 = latest_tree_from_updates.root().into(); let identity_commitments = zip_commitments_and_proofs(commitments, merkle_proofs); - let start_index = *insertion_indices.first().unwrap(); + + identity_manager.validate_merkle_proofs(&identity_commitments)?; + + // We prepare the proof before reserving a slot in the pending identities + let proof = IdentityManager::prepare_insertion_proof( + prover, + start_index, + pre_root, + &identity_commitments, + post_root, + ) + .await?; tracing::info!( start_index, ?pre_root, ?post_root, - "Submitting insertion batch to DB" + "Submitting insertion batch" ); - // With all the data prepared we can submit the batch to database. - database - .insert_new_batch( - &post_root, - &pre_root, - database::types::BatchType::Insertion, - &identity_commitments, - &insertion_indices, + // With all the data prepared we can submit the identities to the on-chain + // identity manager and wait for that transaction to be mined. + let transaction_id = identity_manager + .register_identities( + start_index, + pre_root, + post_root, + identity_commitments, + proof, ) - .await?; + .await + .map_err(|e| { + tracing::error!(?e, "Failed to insert identity to contract."); + e + })?; tracing::info!( start_index, ?pre_root, ?post_root, - "Insertion batch submitted to DB" + ?transaction_id, + "Insertion batch submitted" ); - next_batch_notify.notify_one(); + // Update the batching tree only after submitting the identities to the chain + batching_tree.apply_updates_up_to(post_root.into()); - TaskMonitor::log_batch_size(updates.len()); + tracing::info!(start_index, ?pre_root, ?post_root, "Tree updated"); - batching_tree.apply_updates_up_to(post_root); + TaskMonitor::log_batch_size(updates.len()); - Ok(()) + Ok(Some(transaction_id)) } fn assert_updates_are_consecutive(updates: &[AppliedTreeUpdate]) { @@ -365,18 +368,21 @@ fn assert_updates_are_consecutive(updates: &[AppliedTreeUpdate]) { } pub async fn delete_identities( - database: &Database, + identity_manager: &IdentityManager, batching_tree: &TreeVersion, - next_batch_notify: &Arc, updates: &[AppliedTreeUpdate], prover: &Prover, -) -> anyhow::Result<()> { +) -> anyhow::Result> { // Grab the initial conditions before the updates are applied to the tree. - let pre_root = batching_tree.get_root(); + let pre_root: U256 = batching_tree.get_root().into(); - let mut deletion_indices: Vec<_> = updates.iter().map(|f| f.update.leaf_index).collect(); + let mut deletion_indices = updates + .iter() + .map(|f| f.update.leaf_index as u32) + .collect::>(); - let commitments = batching_tree.commitments_by_indices(deletion_indices.iter().copied()); + let commitments = + batching_tree.commitments_by_indices(deletion_indices.iter().map(|x| *x as usize)); let mut commitments: Vec = commitments.into_iter().map(U256::from).collect(); let latest_tree_from_updates = updates @@ -413,7 +419,7 @@ pub async fn delete_identities( // ensure that our batches match that size. We do this by padding deletion // indices with tree.depth() ^ 2. The deletion prover will skip the proof for // any deletion with an index greater than the max tree depth - let pad_index = 2_u32.pow(latest_tree_from_updates.depth() as u32) as usize; + let pad_index = 2_u32.pow(latest_tree_from_updates.depth() as u32); if commitment_count != batch_size { let padding = batch_size - commitment_count; @@ -436,46 +442,50 @@ pub async fn delete_identities( // With the updates applied we can grab the value of the tree's new root and // build our identities for sending to the identity manager. - let post_root = latest_tree_from_updates.root(); + let post_root: U256 = latest_tree_from_updates.root().into(); let identity_commitments = zip_commitments_and_proofs(commitments, merkle_proofs); - tracing::info!(?pre_root, ?post_root, "Submitting deletion batch to DB"); + identity_manager.validate_merkle_proofs(&identity_commitments)?; - // With all the data prepared we can submit the batch to database. - database - .insert_new_batch( - &post_root, - &pre_root, - database::types::BatchType::Deletion, - &identity_commitments, - &deletion_indices, - ) - .await?; + // We prepare the proof before reserving a slot in the pending identities + let proof = IdentityManager::prepare_deletion_proof( + prover, + pre_root, + deletion_indices.clone(), + identity_commitments, + post_root, + ) + .await?; - tracing::info!(?pre_root, ?post_root, "Deletion batch submitted to DB"); + let packed_deletion_indices = pack_indices(&deletion_indices); - next_batch_notify.notify_one(); + tracing::info!(?pre_root, ?post_root, "Submitting deletion batch"); - TaskMonitor::log_batch_size(updates.len()); + // With all the data prepared we can submit the identities to the on-chain + // identity manager and wait for that transaction to be mined. + let transaction_id = identity_manager + .delete_identities(proof, packed_deletion_indices, pre_root, post_root) + .await + .map_err(|e| { + tracing::error!(?e, "Failed to insert identity to contract."); + e + })?; - batching_tree.apply_updates_up_to(post_root); + tracing::info!( + ?pre_root, + ?post_root, + ?transaction_id, + "Deletion batch submitted" + ); - Ok(()) -} + // Update the batching tree only after submitting the identities to the chain + batching_tree.apply_updates_up_to(post_root.into()); -fn determine_batch_type(tree: &TreeVersion) -> Option { - let next_update = tree.peek_next_updates(1); - if next_update.is_empty() { - return None; - } + tracing::info!(?pre_root, ?post_root, "Tree updated"); - let batch_type = if next_update[0].update.element == Hash::ZERO { - BatchType::Deletion - } else { - BatchType::Insertion - }; + TaskMonitor::log_batch_size(updates.len()); - Some(batch_type) + Ok(Some(transaction_id)) } fn zip_commitments_and_proofs( @@ -498,3 +508,18 @@ fn zip_commitments_and_proofs( }) .collect() } + +fn determine_batch_type(tree: &TreeVersion) -> Option { + let next_update = tree.peek_next_updates(1); + if next_update.is_empty() { + return None; + } + + let batch_type = if next_update[0].update.element == Hash::ZERO { + BatchType::Deletion + } else { + BatchType::Insertion + }; + + Some(batch_type) +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 89e20c25..65a9f15c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -747,7 +747,7 @@ pub async fn spawn_deps<'a, 'b, 'c>( )) } -async fn spawn_db(docker: &Cli) -> anyhow::Result { +async fn spawn_db(docker: &Cli) -> anyhow::Result> { let db_container = postgres_docker_utils::setup(docker).await.unwrap(); Ok(db_container)