diff --git a/schemas/database/013_batches_and_transactions.sql b/schemas/database/013_batches_and_transactions.sql new file mode 100644 index 00000000..7dc91e43 --- /dev/null +++ b/schemas/database/013_batches_and_transactions.sql @@ -0,0 +1,24 @@ +-- Create ENUM for prover type +CREATE TABLE batches +( + id BIGSERIAL UNIQUE PRIMARY KEY, + next_root BYTEA NOT NULL UNIQUE, + prev_root BYTEA UNIQUE, + created_at TIMESTAMPTZ NOT NULL, + batch_type VARCHAR(50) NOT NULL, + data JSON NOT NULL, + + FOREIGN KEY (prev_root) REFERENCES batches (next_root) +); + +CREATE INDEX idx_batches_prev_root ON batches (prev_root); +CREATE UNIQUE INDEX i_single_null_prev_root ON batches ((batches.prev_root IS NULL)) WHERE batches.prev_root IS NULL; + +CREATE TABLE transactions +( + transaction_id VARCHAR(256) NOT NULL UNIQUE PRIMARY KEY, + batch_next_root BYTEA NOT NULL UNIQUE, + created_at TIMESTAMPTZ NOT NULL, + + FOREIGN KEY (batch_next_root) REFERENCES batches (next_root) +); \ No newline at end of file diff --git a/src/app.rs b/src/app.rs index be1fcd15..2085a414 100644 --- a/src/app.rs +++ b/src/app.rs @@ -15,7 +15,7 @@ use crate::database::{Database, DatabaseExt as _}; use crate::ethereum::Ethereum; use crate::identity_tree::{ CanonicalTreeBuilder, Hash, InclusionProof, ProcessedStatus, RootItem, Status, TreeState, - TreeUpdate, TreeVersionReadOps, UnprocessedStatus, + TreeUpdate, TreeVersionReadOps, TreeWithNextVersion, UnprocessedStatus, }; use crate::prover::map::initialize_prover_maps; use crate::prover::{ProverConfig, ProverType}; @@ -261,6 +261,9 @@ impl App { let (processed, batching_builder) = processed_builder.seal_and_continue(); let (batching, mut latest_builder) = batching_builder.seal_and_continue(); + + // We are duplicating updates here for some commitments that were batched but + // this is an idempotent operation let pending_items = self .database .get_commitments_by_status(ProcessedStatus::Pending) @@ -269,6 +272,15 @@ impl App { latest_builder.update(&update); } let latest = latest_builder.seal(); + + let batch = self.database.get_latest_batch().await?; + if let Some(batch) = batch { + if batching.get_root() != batch.next_root { + batching.apply_updates_up_to(batch.next_root); + } + assert_eq!(batching.get_root(), batch.next_root); + } + Ok(Some(TreeState::new(mined, processed, batching, latest))) } @@ -355,6 +367,14 @@ impl App { let latest = latest_builder.seal(); + let batch = self.database.get_latest_batch().await?; + if let Some(batch) = batch { + if batching.get_root() != batch.next_root { + batching.apply_updates_up_to(batch.next_root); + } + assert_eq!(batching.get_root(), batch.next_root); + } + Ok(TreeState::new(mined, processed, batching, latest)) } diff --git a/src/database/mod.rs b/src/database/mod.rs index 81cf9a9c..37b22aa4 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -19,9 +19,11 @@ use tracing::{error, info, instrument, warn}; use self::types::{CommitmentHistoryEntry, DeletionEntry, LatestDeletionEntry, RecoveryEntry}; use crate::config::DatabaseConfig; +use crate::database::types::{BatchEntry, BatchEntryData, BatchType, TransactionEntry}; use crate::identity_tree::{ Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, }; +use crate::prover::identity::Identity; use crate::prover::{ProverConfig, ProverType}; pub mod types; @@ -840,6 +842,241 @@ pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { let row_unprocessed = self.fetch_one(query_queued_deletion).await?; Ok(row_unprocessed.get::(0)) } + + async fn insert_new_batch_head(self, next_root: &Hash) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO batches( + id, + next_root, + prev_root, + created_at, + batch_type, + data + ) VALUES (DEFAULT, $1, NULL, CURRENT_TIMESTAMP, $2, $3) + "#, + ) + .bind(next_root) + .bind(BatchType::Insertion) + .bind(sqlx::types::Json::from(BatchEntryData { + identities: vec![], + indexes: vec![], + })); + + self.execute(query).await?; + Ok(()) + } + + async fn insert_new_batch( + self, + next_root: &Hash, + prev_root: &Hash, + batch_type: BatchType, + identities: &[Identity], + indexes: &[usize], + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO batches( + id, + next_root, + prev_root, + created_at, + batch_type, + data + ) VALUES (DEFAULT, $1, $2, CURRENT_TIMESTAMP, $3, $4) + "#, + ) + .bind(next_root) + .bind(prev_root) + .bind(batch_type) + .bind(sqlx::types::Json::from(BatchEntryData { + identities: identities.to_vec(), + indexes: indexes.to_vec(), + })); + + self.execute(query).await?; + Ok(()) + } + + async fn get_next_batch(self, prev_root: &Hash) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches WHERE prev_root = $1 + LIMIT 1 + "#, + ) + .bind(prev_root) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_latest_batch(self) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches + ORDER BY id DESC + LIMIT 1 + "#, + ) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_latest_batch_with_transaction(self) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + batches.id, + batches.next_root, + batches.prev_root, + batches.created_at, + batches.batch_type, + batches.data + FROM batches + LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root + WHERE transactions.batch_next_root IS NOT NULL AND batches.prev_root IS NOT NULL + ORDER BY batches.id DESC + LIMIT 1 + "#, + ) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_next_batch_without_transaction(self) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + batches.id, + batches.next_root, + batches.prev_root, + batches.created_at, + batches.batch_type, + batches.data + FROM batches + LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root + WHERE transactions.batch_next_root IS NULL AND batches.prev_root IS NOT NULL + ORDER BY batches.id ASC + LIMIT 1 + "#, + ) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_batch_head(self) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches WHERE prev_root IS NULL + LIMIT 1 + "#, + ) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_all_batches_after(self, id: i64) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches WHERE id >= $1 ORDER BY id ASC + "#, + ) + .bind(id) + .fetch_all(self) + .await?; + + Ok(res) + } + + async fn root_in_batch_chain(self, root: &Hash) -> Result { + let query = sqlx::query( + r#"SELECT exists(SELECT 1 FROM batches where prev_root = $1 OR next_root = $1)"#, + ) + .bind(root); + let row_unprocessed = self.fetch_one(query).await?; + Ok(row_unprocessed.get::(0)) + } + + async fn insert_new_transaction( + self, + transaction_id: &String, + batch_next_root: &Hash, + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO transactions( + transaction_id, + batch_next_root, + created_at + ) VALUES ($1, $2, CURRENT_TIMESTAMP) + "#, + ) + .bind(transaction_id) + .bind(batch_next_root); + + self.execute(query).await?; + Ok(()) + } + + async fn get_transaction_for_batch( + self, + next_root: &Hash, + ) -> Result, Error> { + let res = sqlx::query_as::<_, TransactionEntry>( + r#" + SELECT + transaction_id, + batch_next_root, + created_at + FROM transactions WHERE batch_next_root = $1 + LIMIT 1 + "#, + ) + .bind(next_root) + .fetch_optional(self) + .await?; + + Ok(res) + } } #[derive(Debug, Error)] @@ -867,8 +1104,10 @@ mod test { use super::Database; use crate::config::DatabaseConfig; + use crate::database::types::BatchType; use crate::database::DatabaseExt as _; use crate::identity_tree::{Hash, ProcessedStatus, Status, UnprocessedStatus}; + use crate::prover::identity::Identity; use crate::prover::{ProverConfig, ProverType}; use crate::utils::secret::SecretUrl; @@ -2038,4 +2277,166 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_insert_batch() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities: Vec<_> = mock_identities(10) + .iter() + .map(|commitment| { + Identity::new( + (*commitment).into(), + mock_roots(10).iter().map(|root| (*root).into()).collect(), + ) + }) + .collect(); + let roots = mock_roots(2); + + db.insert_new_batch_head(&roots[0]).await?; + db.insert_new_batch(&roots[1], &roots[0], BatchType::Insertion, &identities, &[ + 0, + ]) + .await?; + + Ok(()) + } + + #[tokio::test] + async fn test_get_next_batch() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities: Vec<_> = mock_identities(10) + .iter() + .map(|commitment| { + Identity::new( + (*commitment).into(), + mock_roots(10).iter().map(|root| (*root).into()).collect(), + ) + }) + .collect(); + let indexes = vec![0]; + let roots = mock_roots(2); + + db.insert_new_batch_head(&roots[0]).await?; + db.insert_new_batch( + &roots[1], + &roots[0], + BatchType::Insertion, + &identities, + &indexes, + ) + .await?; + + let next_batch = db.get_next_batch(&roots[0]).await?; + + assert!(next_batch.is_some()); + + let next_batch = next_batch.unwrap(); + + assert_eq!(next_batch.prev_root.unwrap(), roots[0]); + assert_eq!(next_batch.next_root, roots[1]); + assert_eq!(next_batch.data.0.identities, identities); + assert_eq!(next_batch.data.0.indexes, indexes); + + let next_batch = db.get_next_batch(&roots[1]).await?; + + assert!(next_batch.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_get_next_batch_without_transaction() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities: Vec<_> = mock_identities(10) + .iter() + .map(|commitment| { + Identity::new( + (*commitment).into(), + mock_roots(10).iter().map(|root| (*root).into()).collect(), + ) + }) + .collect(); + let indexes = vec![0]; + let roots = mock_roots(2); + let transaction_id = String::from("173bcbfd-e1d9-40e2-ba10-fc1dfbf742c9"); + + db.insert_new_batch_head(&roots[0]).await?; + db.insert_new_batch( + &roots[1], + &roots[0], + BatchType::Insertion, + &identities, + &indexes, + ) + .await?; + + let next_batch = db.get_next_batch_without_transaction().await?; + + assert!(next_batch.is_some()); + + let next_batch = next_batch.unwrap(); + + assert_eq!(next_batch.prev_root.unwrap(), roots[0]); + assert_eq!(next_batch.next_root, roots[1]); + assert_eq!(next_batch.data.0.identities, identities); + assert_eq!(next_batch.data.0.indexes, indexes); + + db.insert_new_transaction(&transaction_id, &roots[1]) + .await?; + + let next_batch = db.get_next_batch_without_transaction().await?; + + assert!(next_batch.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_get_batch_head() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let roots = mock_roots(1); + + let batch_head = db.get_batch_head().await?; + + assert!(batch_head.is_none()); + + db.insert_new_batch_head(&roots[0]).await?; + + let batch_head = db.get_batch_head().await?; + + assert!(batch_head.is_some()); + let batch_head = batch_head.unwrap(); + + assert_eq!(batch_head.prev_root, None); + assert_eq!(batch_head.next_root, roots[0]); + assert!( + batch_head.data.0.identities.is_empty(), + "Should have empty identities." + ); + assert!( + batch_head.data.0.indexes.is_empty(), + "Should have empty indexes." + ); + + Ok(()) + } + + #[tokio::test] + async fn test_insert_transaction() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let roots = mock_roots(1); + let transaction_id = String::from("173bcbfd-e1d9-40e2-ba10-fc1dfbf742c9"); + + db.insert_new_batch_head(&roots[0]).await?; + + db.insert_new_transaction(&transaction_id, &roots[0]) + .await?; + + Ok(()) + } } diff --git a/src/database/types.rs b/src/database/types.rs index 71b596b2..08239ef7 100644 --- a/src/database/types.rs +++ b/src/database/types.rs @@ -1,7 +1,13 @@ use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::database::{HasArguments, HasValueRef}; +use sqlx::encode::IsNull; +use sqlx::error::BoxDynError; use sqlx::prelude::FromRow; +use sqlx::{Decode, Encode, Postgres, Type}; use crate::identity_tree::{Hash, Status, UnprocessedStatus}; +use crate::prover::identity::Identity; pub struct UnprocessedCommitment { pub commitment: Hash, @@ -37,3 +43,137 @@ pub struct CommitmentHistoryEntry { pub held_back: bool, pub status: Status, } + +#[derive(Debug, Copy, Clone, sqlx::Type, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +#[sqlx(type_name = "VARCHAR", rename_all = "PascalCase")] +pub enum BatchType { + #[default] + Insertion, + Deletion, +} + +impl From for BatchType { + fn from(s: String) -> Self { + match s.as_str() { + "Insertion" => BatchType::Insertion, + "Deletion" => BatchType::Deletion, + _ => BatchType::Insertion, + } + } +} + +impl std::fmt::Display for BatchType { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + BatchType::Insertion => write!(f, "insertion"), + BatchType::Deletion => write!(f, "deletion"), + } + } +} + +#[derive(Debug, Clone, FromRow)] +pub struct BatchEntry { + pub id: i64, + pub next_root: Hash, + // In general prev_root is present all the time except the first row (head of the batches + // chain) + pub prev_root: Option, + pub created_at: DateTime, + pub batch_type: BatchType, + pub data: sqlx::types::Json, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct BatchEntryData { + pub identities: Vec, + pub indexes: Vec, +} + +#[derive(Debug, Clone, FromRow)] +pub struct TransactionEntry { + pub batch_next_root: Hash, + pub transaction_id: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Commitments(pub Vec); + +impl Encode<'_, Postgres> for Commitments { + fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> IsNull { + let commitments = &self + .0 + .iter() + .map(|c| c.to_be_bytes()) // Why be not le? + .collect::>(); + + <&Vec<[u8; 32]> as Encode>::encode(commitments, buf) + } +} + +impl Decode<'_, Postgres> for Commitments { + fn decode(value: >::ValueRef) -> Result { + let value = as Decode>::decode(value)?; + + let res = value.iter().map(|&v| Hash::from_be_bytes(v)).collect(); + + Ok(Commitments(res)) + } +} + +impl Type for Commitments { + fn type_info() -> ::TypeInfo { + <&Vec<&[u8]> as Type>::type_info() + } + + fn compatible(ty: &::TypeInfo) -> bool { + <&Vec<&[u8]> as Type>::compatible(ty) + } +} + +impl From> for Commitments { + fn from(value: Vec) -> Self { + Commitments(value) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LeafIndexes(pub Vec); + +impl Encode<'_, Postgres> for LeafIndexes { + fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> IsNull { + let commitments = &self + .0 + .iter() + .map(|&c| c as i64) // Why be not le? + .collect(); + + <&Vec as Encode>::encode(commitments, buf) + } +} + +impl Decode<'_, Postgres> for LeafIndexes { + fn decode(value: >::ValueRef) -> Result { + let value = as Decode>::decode(value)?; + + let res = value.iter().map(|&v| v as usize).collect(); + + Ok(LeafIndexes(res)) + } +} + +impl Type for LeafIndexes { + fn type_info() -> ::TypeInfo { + <&Vec as Type>::type_info() + } + + fn compatible(ty: &::TypeInfo) -> bool { + <&Vec as Type>::compatible(ty) + } +} + +impl From> for LeafIndexes { + fn from(value: Vec) -> Self { + LeafIndexes(value) + } +} diff --git a/src/ethereum/mod.rs b/src/ethereum/mod.rs index e960e5b7..8c4d5871 100644 --- a/src/ethereum/mod.rs +++ b/src/ethereum/mod.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use ethers::providers::Middleware; use ethers::types::transaction::eip2718::TypedTransaction; use ethers::types::Address; pub use read::ReadProvider; diff --git a/src/prover/identity.rs b/src/prover/identity.rs index c0a56b3b..4c2d5f00 100644 --- a/src/prover/identity.rs +++ b/src/prover/identity.rs @@ -1,8 +1,9 @@ use ethers::types::U256; +use serde::{Deserialize, Serialize}; /// A representation of an identity insertion into the merkle tree as used for /// the prover endpoint. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Identity { /// The identity commitment value that is inserted into the merkle tree. pub commitment: U256, diff --git a/src/task_monitor.rs b/src/task_monitor.rs index 1b4ec2ef..058c308d 100644 --- a/src/task_monitor.rs +++ b/src/task_monitor.rs @@ -130,14 +130,37 @@ impl TaskMonitor { ); handles.push(finalize_identities_handle); - // Process identities + let base_next_batch_notify = Arc::new(Notify::new()); + + // Create batches + let app = self.app.clone(); + let next_batch_notify = base_next_batch_notify.clone(); + let wake_up_notify = base_wake_up_notify.clone(); + + let create_batches = move || { + tasks::create_batches::create_batches( + app.clone(), + next_batch_notify.clone(), + wake_up_notify.clone(), + ) + }; + let create_batches_handle = crate::utils::spawn_monitored_with_backoff( + create_batches, + shutdown_sender.clone(), + PROCESS_IDENTITIES_BACKOFF, + ); + handles.push(create_batches_handle); + + // Process batches let app = self.app.clone(); + let next_batch_notify = base_next_batch_notify.clone(); let wake_up_notify = base_wake_up_notify.clone(); let process_identities = move || { - tasks::process_identities::process_identities( + tasks::process_batches::process_batches( app.clone(), monitored_txs_sender.clone(), + next_batch_notify.clone(), wake_up_notify.clone(), ) }; diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/create_batches.rs similarity index 77% rename from src/task_monitor/tasks/process_identities.rs rename to src/task_monitor/tasks/create_batches.rs index 6c5c8bcb..64c9d297 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/create_batches.rs @@ -2,18 +2,18 @@ use std::sync::Arc; use anyhow::Context; use chrono::{DateTime, Utc}; -use ethers::types::U256; +use ethers::prelude::U256; use ruint::Uint; use semaphore::merkle_tree::Proof; use semaphore::poseidon_tree::{Branch, PoseidonHash}; -use tokio::sync::{mpsc, Notify}; +use tokio::sync::Notify; use tokio::{select, time}; use tracing::instrument; use crate::app::App; use crate::contracts::IdentityManager; -use crate::database::DatabaseExt as _; -use crate::ethereum::write::TransactionId; +use crate::database; +use crate::database::{Database, DatabaseExt as _}; use crate::identity_tree::{ AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, }; @@ -21,21 +21,21 @@ use crate::prover::identity::Identity; use crate::prover::Prover; use crate::task_monitor::TaskMonitor; use crate::utils::batch_type::BatchType; -use crate::utils::index_packing::pack_indices; /// The number of seconds either side of the timer tick to treat as enough to /// trigger a forced batch insertion. const DEBOUNCE_THRESHOLD_SECS: i64 = 1; -pub async fn process_identities( +pub async fn create_batches( app: Arc, - monitored_txs_sender: Arc>, + next_batch_notify: Arc, wake_up_notify: Arc, ) -> anyhow::Result<()> { tracing::info!("Awaiting for a clean slate"); app.identity_manager.await_clean_slate().await?; - tracing::info!("Starting identity processor."); + tracing::info!("Starting batch creator."); + ensure_batch_chain_initialized(&app).await?; // We start a timer and force it to perform one initial tick to avoid an // immediate trigger. @@ -83,12 +83,18 @@ pub async fn process_identities( .batching_tree() .peek_next_updates(batch_size); + if updates.is_empty() { + tracing::trace!("No updates found. Waiting."); + continue; + } + // If the batch is a deletion, process immediately without resetting the timer if batch_type.is_deletion() { commit_identities( + &app.database, &app.identity_manager, app.tree_state()?.batching_tree(), - &monitored_txs_sender, + &next_batch_notify, &updates, ) .await?; @@ -107,9 +113,10 @@ pub async fn process_identities( // process the batch if updates.len() >= batch_size || batch_time_elapsed { commit_identities( + &app.database, &app.identity_manager, app.tree_state()?.batching_tree(), - &monitored_txs_sender, + &next_batch_notify, &updates, ) .await?; @@ -142,9 +149,10 @@ pub async fn process_identities( // If the next batch is deletion, process the current insertion batch if next_batch_is_deletion { commit_identities( + &app.database, &app.identity_manager, app.tree_state()?.batching_tree(), - &monitored_txs_sender, + &next_batch_notify, &updates, ) .await?; @@ -166,14 +174,25 @@ pub async fn process_identities( } } +async fn ensure_batch_chain_initialized(app: &Arc) -> anyhow::Result<()> { + let batch_head = app.database.get_batch_head().await?; + if batch_head.is_none() { + app.database + .insert_new_batch_head(&app.tree_state()?.batching_tree().get_root()) + .await?; + } + Ok(()) +} + async fn commit_identities( + database: &Database, identity_manager: &IdentityManager, batching_tree: &TreeVersion, - monitored_txs_sender: &mpsc::Sender, + next_batch_notify: &Arc, updates: &[AppliedTreeUpdate], ) -> anyhow::Result<()> { // If the update is an insertion - let tx_id = if updates + if updates .first() .context("Updates should be > 1")? .update @@ -190,7 +209,7 @@ async fn commit_identities( "Insertion batch", ); - insert_identities(identity_manager, batching_tree, updates, &prover).await? + insert_identities(database, batching_tree, next_batch_notify, updates, &prover).await } else { let prover = identity_manager .get_suitable_deletion_prover(updates.len()) @@ -202,27 +221,22 @@ async fn commit_identities( "Deletion batch" ); - delete_identities(identity_manager, batching_tree, updates, &prover).await? - }; - - if let Some(tx_id) = tx_id { - monitored_txs_sender.send(tx_id).await?; + delete_identities(database, batching_tree, next_batch_notify, updates, &prover).await } - - Ok(()) } #[instrument(level = "info", skip_all)] pub async fn insert_identities( - identity_manager: &IdentityManager, + database: &Database, batching_tree: &TreeVersion, + next_batch_notify: &Arc, updates: &[AppliedTreeUpdate], prover: &Prover, -) -> anyhow::Result> { +) -> anyhow::Result<()> { assert_updates_are_consecutive(updates); - let start_index = updates[0].update.leaf_index; - let pre_root: U256 = batching_tree.get_root().into(); + let pre_root = batching_tree.get_root(); + let mut insertion_indices: Vec<_> = updates.iter().map(|f| f.update.leaf_index).collect(); let mut commitments: Vec = updates .iter() .map(|update| update.update.element.into()) @@ -271,6 +285,7 @@ pub async fn insert_identities( for i in start_index..(start_index + padding) { let proof = latest_tree_from_updates.proof(i); merkle_proofs.push(proof); + insertion_indices.push(i); } } @@ -287,60 +302,42 @@ pub async fn insert_identities( // With the updates applied we can grab the value of the tree's new root and // build our identities for sending to the identity manager. - let post_root: U256 = latest_tree_from_updates.root().into(); + let post_root = latest_tree_from_updates.root(); let identity_commitments = zip_commitments_and_proofs(commitments, merkle_proofs); - - identity_manager.validate_merkle_proofs(&identity_commitments)?; - - // We prepare the proof before reserving a slot in the pending identities - let proof = IdentityManager::prepare_insertion_proof( - prover, - start_index, - pre_root, - &identity_commitments, - post_root, - ) - .await?; + let start_index = *insertion_indices.first().unwrap(); tracing::info!( start_index, ?pre_root, ?post_root, - "Submitting insertion batch" + "Submitting insertion batch to DB" ); - // With all the data prepared we can submit the identities to the on-chain - // identity manager and wait for that transaction to be mined. - let transaction_id = identity_manager - .register_identities( - start_index, - pre_root, - post_root, - identity_commitments, - proof, + // With all the data prepared we can submit the batch to database. + database + .insert_new_batch( + &post_root, + &pre_root, + database::types::BatchType::Insertion, + &identity_commitments, + &insertion_indices, ) - .await - .map_err(|e| { - tracing::error!(?e, "Failed to insert identity to contract."); - e - })?; + .await?; tracing::info!( start_index, ?pre_root, ?post_root, - ?transaction_id, - "Insertion batch submitted" + "Insertion batch submitted to DB" ); - // Update the batching tree only after submitting the identities to the chain - batching_tree.apply_updates_up_to(post_root.into()); - - tracing::info!(start_index, ?pre_root, ?post_root, "Tree updated"); + next_batch_notify.notify_one(); TaskMonitor::log_batch_size(updates.len()); - Ok(Some(transaction_id)) + batching_tree.apply_updates_up_to(post_root); + + Ok(()) } fn assert_updates_are_consecutive(updates: &[AppliedTreeUpdate]) { @@ -368,21 +365,18 @@ fn assert_updates_are_consecutive(updates: &[AppliedTreeUpdate]) { } pub async fn delete_identities( - identity_manager: &IdentityManager, + database: &Database, batching_tree: &TreeVersion, + next_batch_notify: &Arc, updates: &[AppliedTreeUpdate], prover: &Prover, -) -> anyhow::Result> { +) -> anyhow::Result<()> { // Grab the initial conditions before the updates are applied to the tree. - let pre_root: U256 = batching_tree.get_root().into(); + let pre_root = batching_tree.get_root(); - let mut deletion_indices = updates - .iter() - .map(|f| f.update.leaf_index as u32) - .collect::>(); + let mut deletion_indices: Vec<_> = updates.iter().map(|f| f.update.leaf_index).collect(); - let commitments = - batching_tree.commitments_by_indices(deletion_indices.iter().map(|x| *x as usize)); + let commitments = batching_tree.commitments_by_indices(deletion_indices.iter().copied()); let mut commitments: Vec = commitments.into_iter().map(U256::from).collect(); let latest_tree_from_updates = updates @@ -419,7 +413,7 @@ pub async fn delete_identities( // ensure that our batches match that size. We do this by padding deletion // indices with tree.depth() ^ 2. The deletion prover will skip the proof for // any deletion with an index greater than the max tree depth - let pad_index = 2_u32.pow(latest_tree_from_updates.depth() as u32); + let pad_index = 2_u32.pow(latest_tree_from_updates.depth() as u32) as usize; if commitment_count != batch_size { let padding = batch_size - commitment_count; @@ -442,50 +436,46 @@ pub async fn delete_identities( // With the updates applied we can grab the value of the tree's new root and // build our identities for sending to the identity manager. - let post_root: U256 = latest_tree_from_updates.root().into(); + let post_root = latest_tree_from_updates.root(); let identity_commitments = zip_commitments_and_proofs(commitments, merkle_proofs); - identity_manager.validate_merkle_proofs(&identity_commitments)?; + tracing::info!(?pre_root, ?post_root, "Submitting deletion batch to DB"); - // We prepare the proof before reserving a slot in the pending identities - let proof = IdentityManager::prepare_deletion_proof( - prover, - pre_root, - deletion_indices.clone(), - identity_commitments, - post_root, - ) - .await?; + // With all the data prepared we can submit the batch to database. + database + .insert_new_batch( + &post_root, + &pre_root, + database::types::BatchType::Deletion, + &identity_commitments, + &deletion_indices, + ) + .await?; - let packed_deletion_indices = pack_indices(&deletion_indices); + tracing::info!(?pre_root, ?post_root, "Deletion batch submitted to DB"); - tracing::info!(?pre_root, ?post_root, "Submitting deletion batch"); + next_batch_notify.notify_one(); - // With all the data prepared we can submit the identities to the on-chain - // identity manager and wait for that transaction to be mined. - let transaction_id = identity_manager - .delete_identities(proof, packed_deletion_indices, pre_root, post_root) - .await - .map_err(|e| { - tracing::error!(?e, "Failed to insert identity to contract."); - e - })?; + TaskMonitor::log_batch_size(updates.len()); - tracing::info!( - ?pre_root, - ?post_root, - ?transaction_id, - "Deletion batch submitted" - ); + batching_tree.apply_updates_up_to(post_root); - // Update the batching tree only after submitting the identities to the chain - batching_tree.apply_updates_up_to(post_root.into()); + Ok(()) +} - tracing::info!(?pre_root, ?post_root, "Tree updated"); +fn determine_batch_type(tree: &TreeVersion) -> Option { + let next_update = tree.peek_next_updates(1); + if next_update.is_empty() { + return None; + } - TaskMonitor::log_batch_size(updates.len()); + let batch_type = if next_update[0].update.element == Hash::ZERO { + BatchType::Deletion + } else { + BatchType::Insertion + }; - Ok(Some(transaction_id)) + Some(batch_type) } fn zip_commitments_and_proofs( @@ -508,18 +498,3 @@ fn zip_commitments_and_proofs( }) .collect() } - -fn determine_batch_type(tree: &TreeVersion) -> Option { - let next_update = tree.peek_next_updates(1); - if next_update.is_empty() { - return None; - } - - let batch_type = if next_update[0].update.element == Hash::ZERO { - BatchType::Deletion - } else { - BatchType::Insertion - }; - - Some(batch_type) -} diff --git a/src/task_monitor/tasks/mod.rs b/src/task_monitor/tasks/mod.rs index c4ffb8f0..c5f0b77e 100644 --- a/src/task_monitor/tasks/mod.rs +++ b/src/task_monitor/tasks/mod.rs @@ -1,5 +1,6 @@ +pub mod create_batches; pub mod delete_identities; pub mod finalize_identities; pub mod insert_identities; pub mod monitor_txs; -pub mod process_identities; +pub mod process_batches; diff --git a/src/task_monitor/tasks/process_batches.rs b/src/task_monitor/tasks/process_batches.rs new file mode 100644 index 00000000..6a4bc411 --- /dev/null +++ b/src/task_monitor/tasks/process_batches.rs @@ -0,0 +1,203 @@ +use std::sync::Arc; +use std::time::Duration; + +use ethers::types::U256; +use tokio::sync::{mpsc, Notify}; +use tokio::{select, time}; +use tracing::instrument; + +use crate::app::App; +use crate::contracts::IdentityManager; +use crate::database::types::{BatchEntry, BatchType}; +use crate::database::DatabaseExt as _; +use crate::ethereum::write::TransactionId; +use crate::prover::Prover; +use crate::utils::index_packing::pack_indices; + +pub async fn process_batches( + app: Arc, + monitored_txs_sender: Arc>, + next_batch_notify: Arc, + wake_up_notify: Arc, +) -> anyhow::Result<()> { + tracing::info!("Awaiting for a clean slate"); + app.identity_manager.await_clean_slate().await?; + + tracing::info!("Starting identity processor."); + + // We start a timer and force it to perform one initial tick to avoid an + // immediate trigger. + let mut timer = time::interval(Duration::from_secs(5)); + + loop { + // We wait either for a timer tick or a full batch + select! { + _ = timer.tick() => { + tracing::info!("Identity batch insertion woken due to timeout"); + } + + () = next_batch_notify.notified() => { + tracing::trace!("Identity batch insertion woken due to next batch creation"); + }, + + () = wake_up_notify.notified() => { + tracing::trace!("Identity batch insertion woken due to request"); + }, + } + + let next_batch = app.database.get_next_batch_without_transaction().await?; + let Some(next_batch) = next_batch else { + continue; + }; + + let tx_id = + commit_identities(&app.identity_manager, &monitored_txs_sender, &next_batch).await?; + + if let Some(tx_id) = tx_id { + app.database + .insert_new_transaction(&tx_id.0, &next_batch.next_root) + .await?; + } + + // We want to check if there's a full batch available immediately + wake_up_notify.notify_one(); + } +} + +async fn commit_identities( + identity_manager: &IdentityManager, + monitored_txs_sender: &mpsc::Sender, + batch: &BatchEntry, +) -> anyhow::Result> { + // If the update is an insertion + let tx_id = if batch.batch_type == BatchType::Insertion { + let prover = identity_manager + .get_suitable_insertion_prover(batch.data.0.identities.len()) + .await?; + + tracing::info!( + num_updates = batch.data.0.identities.len(), + batch_size = prover.batch_size(), + "Insertion batch", + ); + + insert_identities(identity_manager, &prover, batch).await? + } else { + let prover = identity_manager + .get_suitable_deletion_prover(batch.data.0.identities.len()) + .await?; + + tracing::info!( + num_updates = batch.data.0.identities.len(), + batch_size = prover.batch_size(), + "Deletion batch" + ); + + delete_identities(identity_manager, &prover, batch).await? + }; + + if let Some(tx_id) = tx_id.clone() { + monitored_txs_sender.send(tx_id).await?; + } + + Ok(tx_id) +} + +#[instrument(level = "info", skip_all)] +pub async fn insert_identities( + identity_manager: &IdentityManager, + prover: &Prover, + batch: &BatchEntry, +) -> anyhow::Result> { + identity_manager.validate_merkle_proofs(&batch.data.0.identities)?; + let start_index = *batch.data.0.indexes.first().expect("Should exist."); + let pre_root: U256 = batch.prev_root.expect("Should exist.").into(); + let post_root: U256 = batch.next_root.into(); + + // We prepare the proof before reserving a slot in the pending identities + let proof = IdentityManager::prepare_insertion_proof( + prover, + start_index, + pre_root, + &batch.data.0.identities, + post_root, + ) + .await?; + + tracing::info!( + start_index, + ?pre_root, + ?post_root, + "Submitting insertion batch" + ); + + // With all the data prepared we can submit the identities to the on-chain + // identity manager and wait for that transaction to be mined. + let transaction_id = identity_manager + .register_identities( + start_index, + pre_root, + post_root, + batch.data.0.identities.clone(), + proof, + ) + .await + .map_err(|e| { + tracing::error!(?e, "Failed to insert identity to contract."); + e + })?; + + tracing::info!( + start_index, + ?pre_root, + ?post_root, + ?transaction_id, + "Insertion batch submitted" + ); + + Ok(Some(transaction_id)) +} + +pub async fn delete_identities( + identity_manager: &IdentityManager, + prover: &Prover, + batch: &BatchEntry, +) -> anyhow::Result> { + identity_manager.validate_merkle_proofs(&batch.data.0.identities)?; + let pre_root: U256 = batch.prev_root.expect("Should exist.").into(); + let post_root: U256 = batch.next_root.into(); + let deletion_indices: Vec<_> = batch.data.0.indexes.iter().map(|&v| v as u32).collect(); + + // We prepare the proof before reserving a slot in the pending identities + let proof = IdentityManager::prepare_deletion_proof( + prover, + pre_root, + deletion_indices.clone(), + batch.data.0.identities.clone(), + post_root, + ) + .await?; + + let packed_deletion_indices = pack_indices(&deletion_indices); + + tracing::info!(?pre_root, ?post_root, "Submitting deletion batch"); + + // With all the data prepared we can submit the identities to the on-chain + // identity manager and wait for that transaction to be mined. + let transaction_id = identity_manager + .delete_identities(proof, packed_deletion_indices, pre_root, post_root) + .await + .map_err(|e| { + tracing::error!(?e, "Failed to insert identity to contract."); + e + })?; + + tracing::info!( + ?pre_root, + ?post_root, + ?transaction_id, + "Deletion batch submitted" + ); + + Ok(Some(transaction_id)) +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 65a9f15c..89e20c25 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -747,7 +747,7 @@ pub async fn spawn_deps<'a, 'b, 'c>( )) } -async fn spawn_db(docker: &Cli) -> anyhow::Result> { +async fn spawn_db(docker: &Cli) -> anyhow::Result { let db_container = postgres_docker_utils::setup(docker).await.unwrap(); Ok(db_container)