From 428daf1e20e6e528382055abd6ae71cfdd9e059e Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Thu, 17 Oct 2024 15:45:56 +0200 Subject: [PATCH] WIP --- Readme.md | 11 +- schemas/database/016_remove_recovery.down.sql | 13 + schemas/database/016_remove_recovery.up.sql | 11 + schemas/openapi.yaml | 29 -- src/app.rs | 100 ++---- src/database/methods.rs | 145 ++------- src/database/mod.rs | 286 +++++++----------- src/database/types.rs | 11 +- src/identity/processor.rs | 185 +++++------ src/identity_tree/initializer.rs | 2 +- src/identity_tree/mod.rs | 44 ++- src/server/mod.rs | 21 +- src/task_monitor/tasks/create_batches.rs | 2 +- src/task_monitor/tasks/delete_identities.rs | 2 +- src/task_monitor/tasks/insert_identities.rs | 84 ++--- tests/common/mod.rs | 54 +--- tests/recover_identities.rs | 211 ------------- tests/unreduced_identity.rs | 18 -- 18 files changed, 346 insertions(+), 883 deletions(-) create mode 100644 schemas/database/016_remove_recovery.down.sql create mode 100644 schemas/database/016_remove_recovery.up.sql delete mode 100644 tests/recover_identities.rs diff --git a/Readme.md b/Readme.md index 4e0ce338..ec0104c6 100644 --- a/Readme.md +++ b/Readme.md @@ -37,17 +37,14 @@ Sequencer has 6 API routes. indeed in the tree. The inclusion proof is then returned to the API caller. 3. `/deleteIdentity` - Takes an identity commitment hash, ensures that it exists and hasn't been deleted yet. This identity is then scheduled for deletion. -4. `/recoverIdentity` - Takes two identity commitment hashes. The first must exist and will be scheduled for deletion - and the other will be inserted as a replacement after the first identity has been deleted and a set amount of time ( - depends on configuration parameters) has passed. -5. `/verifySemaphoreProof` - This call takes root, signal hash, nullifier hash, external nullifier hash and a proof. +4. `/verifySemaphoreProof` - This call takes root, signal hash, nullifier hash, external nullifier hash and a proof. The proving key is fetched based on the depth index, and verification key as well. The list of prime fields is created based on request input mentioned before, and then we proceed to verify the proof. Sequencer uses groth16 zk-SNARK implementation. The API call returns the proof as a response. -6. `/addBatchSize` - Adds a prover with specific batch size to a list of provers. -7. `/removeBatchSize` - Removes the prover based on batch size. -8. `/listBatchSizes` - Lists all provers that are added to the Sequencer. +5. `/addBatchSize` - Adds a prover with specific batch size to a list of provers. +6. `/removeBatchSize` - Removes the prover based on batch size. +7. `/listBatchSizes` - Lists all provers that are added to the Sequencer. ## Getting Started diff --git a/schemas/database/016_remove_recovery.down.sql b/schemas/database/016_remove_recovery.down.sql new file mode 100644 index 00000000..68221433 --- /dev/null +++ b/schemas/database/016_remove_recovery.down.sql @@ -0,0 +1,13 @@ +CREATE TABLE recoveries ( + existing_commitment BYTEA NOT NULL UNIQUE, + new_commitment BYTEA NOT NULL UNIQUE +); + +ALTER TABLE unprocessed_identities + ADD COLUMN eligibility TIMESTAMPTZ, + ADD COLUMN status VARCHAR(50) NOT NULL, + ADD COLUMN processed_at TIMESTAMPTZ, + ADD COLUMN error_message TEXT; + +ALTER TABLE unprocessed_identities + DROP CONSTRAINT unique_commitment; diff --git a/schemas/database/016_remove_recovery.up.sql b/schemas/database/016_remove_recovery.up.sql new file mode 100644 index 00000000..4d0ea58a --- /dev/null +++ b/schemas/database/016_remove_recovery.up.sql @@ -0,0 +1,11 @@ +DROP TABLE recoveries; + +ALTER TABLE unprocessed_identities + DROP COLUMN eligibility, + DROP COLUMN status, + DROP COLUMN processed_at, + DROP COLUMN error_message; + +ALTER TABLE unprocessed_identities + ADD CONSTRAINT unique_commitment UNIQUE (commitment); + diff --git a/schemas/openapi.yaml b/schemas/openapi.yaml index 43a2f951..67b181c4 100644 --- a/schemas/openapi.yaml +++ b/schemas/openapi.yaml @@ -62,26 +62,6 @@ paths: schema: description: 'Identity could not be queued for deletion' type: 'string' - /recoverIdentity: - post: - summary: 'Queues a recovery request, deleting the previous identity specified and inserting the new one. - New insertions must wait a specified time delay before being included in the merkle tree' - requestBody: - required: true - content: - application/json: - schema: - $ref: '#/components/schemas/RecoveryRequest' - responses: - '202': - description: 'Identity has been successfully queued for recovery' - '400': - description: 'Invalid request' - content: - application/json: - schema: - description: 'Identity could not be queued for recovery' - type: 'string' /inclusionProof: post: summary: 'Get Merkle inclusion proof' @@ -152,15 +132,6 @@ paths: components: schemas: - RecoveryRequest: - type: object - properties: - previousIdentityCommitment: - type: string - pattern: '^[A-F0-9]{64}$' - newIdentityCommitment: - type: string - pattern: '^[A-F0-9]{64}$' IdentityCommitment: type: object properties: diff --git a/src/app.rs b/src/app.rs index 5dc577d8..54b5fbad 100644 --- a/src/app.rs +++ b/src/app.rs @@ -4,24 +4,22 @@ use std::sync::{Arc, OnceLock}; use chrono::{Duration, Utc}; use ruint::Uint; use semaphore::protocol::verify_proof; -use sqlx::{Postgres, Transaction}; use tracing::{info, instrument, warn}; use crate::config::Config; use crate::contracts::IdentityManager; use crate::database::methods::DbMethods as _; -use crate::database::Database; +use crate::database::{Database, IsolationLevel}; use crate::ethereum::Ethereum; use crate::identity::processor::{ IdentityProcessor, OffChainIdentityProcessor, OnChainIdentityProcessor, }; use crate::identity::validator::IdentityValidator; use crate::identity_tree::initializer::TreeInitializer; -use crate::identity_tree::{Hash, InclusionProof, RootItem, TreeState, TreeVersionReadOps}; +use crate::identity_tree::{Hash, InclusionProof, RootItem, TreeState, TreeVersionOps}; use crate::prover::map::initialize_prover_maps; use crate::prover::repository::ProverRepository; use crate::prover::{ProverConfig, ProverType}; -use crate::retry_tx; use crate::server::data::{ InclusionProofResponse, ListBatchSizesResponse, VerifySemaphoreProofQuery, VerifySemaphoreProofRequest, VerifySemaphoreProofResponse, @@ -157,22 +155,19 @@ impl App { // TODO: ensure that the id is not in the tree or in unprocessed identities - if self.database.identity_exists(commitment).await? { + let mut tx = self + .database + .begin_tx(IsolationLevel::ReadCommitted) + .await?; + + if tx.identity_exists(commitment).await? { return Err(ServerError::DuplicateCommitment); } - self.database - .insert_new_identity(commitment, Utc::now()) - .await?; + tx.insert_unprocessed_identity(commitment, Utc::now()).await?; - Ok(()) - } + tx.commit().await?; - pub async fn delete_identity_tx(&self, commitment: &Hash) -> Result<(), ServerError> { - retry_tx!(self.database.pool, tx, { - self.delete_identity(&mut tx, commitment).await - }) - .await?; Ok(()) } @@ -182,12 +177,13 @@ impl App { /// /// Will return `Err` if identity is already queued, not in the tree, or the /// queue malfunctions. - #[instrument(level = "debug", skip(self, tx))] - pub async fn delete_identity( - &self, - tx: &mut Transaction<'_, Postgres>, - commitment: &Hash, - ) -> Result<(), ServerError> { + #[instrument(level = "debug", skip(self))] + pub async fn delete_identity(&self, commitment: &Hash) -> Result<(), ServerError> { + let mut tx = self + .database + .begin_tx(IsolationLevel::RepeatableRead) + .await?; + // Ensure that deletion provers exist if !self.prover_repository.has_deletion_provers().await { warn!( @@ -213,11 +209,6 @@ impl App { return Err(ServerError::IdentityAlreadyDeleted); } - // Check if the id is already queued for deletion - if tx.identity_is_queued_for_deletion(commitment).await? { - return Err(ServerError::IdentityQueuedForDeletion); - } - // Check if there are any deletions, if not, set the latest deletion timestamp // to now to ensure that the new deletion is processed by the next deletion // interval @@ -225,64 +216,11 @@ impl App { tx.update_latest_deletion(Utc::now()).await?; } - // If the id has not been deleted, insert into the deletions table tx.insert_new_deletion(leaf_index, commitment).await?; - Ok(()) - } - - /// Queues a recovery of an identity. - /// - /// i.e. deletion and reinsertion after a set period of time. - /// - /// # Errors - /// - /// Will return `Err` if identity is already queued for deletion, not in the - /// tree, or the queue malfunctions. - #[instrument(level = "debug", skip(self))] - pub async fn recover_identity( - &self, - existing_commitment: &Hash, - new_commitment: &Hash, - ) -> Result<(), ServerError> { - retry_tx!(self.database.pool, tx, { - if self.identity_validator.is_initial_leaf(new_commitment) { - warn!( - ?new_commitment, - "Attempt to insert initial leaf in recovery." - ); - return Err(ServerError::InvalidCommitment); - } + tx.commit().await?; - if !self.prover_repository.has_insertion_provers().await { - warn!( - ?new_commitment, - "Identity Manager has no provers. Add provers with /addBatchSize request." - ); - return Err(ServerError::NoProversOnIdInsert); - } - - if !self.identity_validator.is_reduced(*new_commitment) { - warn!( - ?new_commitment, - "The new identity commitment is not reduced." - ); - return Err(ServerError::UnreducedCommitment); - } - - if tx.identity_exists(*new_commitment).await? { - return Err(ServerError::DuplicateCommitment); - } - - // Delete the existing id and insert the commitments into the recovery table - self.delete_identity(&mut tx, existing_commitment).await?; - - tx.insert_new_recovery(existing_commitment, new_commitment) - .await?; - - Ok(()) - }) - .await + Ok(()) } fn merge_env_provers( diff --git a/src/database/methods.rs b/src/database/methods.rs index e51c2073..d4f0d8c1 100644 --- a/src/database/methods.rs +++ b/src/database/methods.rs @@ -475,50 +475,23 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } #[instrument(skip(self), level = "debug")] - async fn insert_new_identity( - self, - identity: Hash, - eligibility_timestamp: sqlx::types::chrono::DateTime, - ) -> Result { + async fn insert_unprocessed_identity(self, identity: Hash) -> Result { let mut conn = self.acquire().await?; sqlx::query( r#" - INSERT INTO unprocessed_identities (commitment, status, created_at, eligibility) - VALUES ($1, $2, CURRENT_TIMESTAMP, $3) + INSERT INTO unprocessed_identities (commitment, created_at) + VALUES ($1, CURRENT_TIMESTAMP) + ON CONFLICT DO NOTHING "#, ) .bind(identity) - .bind(<&str>::from(UnprocessedStatus::New)) - .bind(eligibility_timestamp) .execute(&mut *conn) .await?; Ok(identity) } - #[instrument(skip(self), level = "debug")] - async fn insert_new_recovery( - self, - existing_commitment: &Hash, - new_commitment: &Hash, - ) -> Result<(), Error> { - let mut conn = self.acquire().await?; - - sqlx::query( - r#" - INSERT INTO recoveries (existing_commitment, new_commitment) - VALUES ($1, $2) - "#, - ) - .bind(existing_commitment) - .bind(new_commitment) - .execute(&mut *conn) - .await?; - - Ok(()) - } - #[instrument(skip(self), level = "debug")] async fn get_latest_deletion(self) -> Result { let mut conn = self.acquire().await?; @@ -580,47 +553,9 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } - #[cfg(test)] - #[instrument(skip(self), level = "debug")] - async fn get_all_recoveries(self) -> Result, Error> { - let mut conn = self.acquire().await?; - - Ok( - sqlx::query_as::<_, RecoveryEntry>("SELECT * FROM recoveries") - .fetch_all(&mut *conn) - .await?, - ) - } - - #[instrument(skip(self, prev_commits), level = "debug")] - async fn delete_recoveries(self, prev_commits: I) -> Result, Error> - where - I: IntoIterator + Send, - T: Into, - { - let mut conn = self.acquire().await?; - - // TODO: upstream PgHasArrayType impl to ruint - let prev_commits = prev_commits - .into_iter() - .map(|c| c.into().to_be_bytes()) - .collect::>(); - - let res = sqlx::query_as::<_, RecoveryEntry>( - r#" - DELETE - FROM recoveries - WHERE existing_commitment = ANY($1) - RETURNING * - "#, - ) - .bind(&prev_commits) - .fetch_all(&mut *conn) - .await?; - - Ok(res) - } - + /// Inserts a new deletion into the deletions table + /// + /// This method is idempotent and on conflict nothing will happen #[instrument(skip(self), level = "debug")] async fn insert_new_deletion(self, leaf_index: usize, identity: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -629,6 +564,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { r#" INSERT INTO deletions (leaf_index, commitment) VALUES ($1, $2) + ON CONFLICT DO NOTHING "#, ) .bind(leaf_index as i64) @@ -682,74 +618,49 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } #[instrument(skip(self), level = "debug")] - async fn get_eligible_unprocessed_commitments( - self, - status: UnprocessedStatus, - ) -> Result, Error> { + async fn get_unprocessed_commitments(self) -> Result, Error> { let mut conn = self.acquire().await?; - let result = sqlx::query( + let result: Vec<(Hash,)> = sqlx::query_as( r#" - SELECT * FROM unprocessed_identities - WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility + SELECT commitment FROM unprocessed_identities LIMIT $2 "#, ) - .bind(<&str>::from(status)) .bind(MAX_UNPROCESSED_FETCH_COUNT) .fetch_all(&mut *conn) .await?; - Ok(result - .into_iter() - .map(|row| types::UnprocessedCommitment { - commitment: row.get::(0), - status, - created_at: row.get::<_, _>(2), - processed_at: row.get::<_, _>(3), - error_message: row.get::<_, _>(4), - eligibility_timestamp: row.get::<_, _>(5), - }) - .collect::>()) + Ok(result.into_iter().map(|(commitment,)| commitment).collect()) } - /// Returns the error message from the unprocessed identities table - /// if it exists - /// - /// - The outer option represents the existence of the commitment in the - /// unprocessed_identities table - /// - The inner option represents the existence of an error message #[instrument(skip(self), level = "debug")] - async fn get_unprocessed_error( - self, - commitment: &Hash, - ) -> Result>, Error> { + async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; - let result: Option<(Option,)> = sqlx::query_as( + sqlx::query( r#" - SELECT error_message - FROM unprocessed_identities - WHERE commitment = $1 + DELETE FROM unprocessed_identities WHERE commitment = $1 "#, ) .bind(commitment) - .fetch_optional(&mut *conn) + .execute(&mut *conn) .await?; - Ok(result.map(|(error_message,)| error_message)) + Ok(()) } #[instrument(skip(self), level = "debug")] - async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { + async fn trim_unprocessed(self) -> Result<(), Error> { let mut conn = self.acquire().await?; sqlx::query( r#" - DELETE FROM unprocessed_identities WHERE commitment = $1 + DELETE FROM unprocessed_identities u + USING identities i + WHERE u.commitment = i.commitment "#, ) - .bind(commitment) .execute(&mut *conn) .await?; @@ -773,20 +684,6 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { .get::(0)) } - // TODO: add docs - #[instrument(skip(self), level = "debug")] - async fn identity_is_queued_for_deletion(self, commitment: &Hash) -> Result { - let mut conn = self.acquire().await?; - - let row_unprocessed = - sqlx::query(r#"SELECT exists(SELECT 1 FROM deletions where commitment = $1)"#) - .bind(commitment) - .fetch_one(&mut *conn) - .await?; - - Ok(row_unprocessed.get::(0)) - } - #[instrument(skip(self), level = "debug")] async fn insert_new_batch_head(self, next_root: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; diff --git a/src/database/mod.rs b/src/database/mod.rs index ed503b9e..aae12d24 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -10,7 +10,7 @@ use std::ops::Deref; use anyhow::{anyhow, Context, Error as ErrReport}; use sqlx::migrate::{Migrate, MigrateDatabase, Migrator}; use sqlx::pool::PoolOptions; -use sqlx::{Executor, Pool, Postgres, Row}; +use sqlx::{Executor, Pool, Postgres, Row, Transaction}; use thiserror::Error; use tracing::{error, info, instrument, warn}; @@ -27,6 +27,17 @@ pub struct Database { pub pool: Pool, } +/// Transaction isolation level +/// +/// PG docs: https://www.postgresql.org/docs/current/transaction-iso.html +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum IsolationLevel { + ReadUncommited, + ReadCommitted, + RepeatableRead, + Serializable, +} + impl Deref for Database { type Target = Pool; @@ -49,13 +60,6 @@ impl Database { // Create a connection pool let pool = PoolOptions::::new() .max_connections(config.max_connections) - .after_connect(|conn, _| { - Box::pin(async move { - conn.execute("SET DEFAULT_TRANSACTION_ISOLATION TO 'SERIALIZABLE'") - .await?; - Ok(()) - }) - }) .connect(config.database.expose()) .await .context("error connecting to database")?; @@ -132,6 +136,34 @@ impl Database { Ok(Self { pool }) } + + pub async fn begin_tx( + &self, + isolation_level: IsolationLevel, + ) -> Result, Error> { + let mut tx = self.begin().await?; + + match isolation_level { + IsolationLevel::ReadUncommited => { + tx.execute("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED") + .await?; + } + IsolationLevel::ReadCommitted => { + tx.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED") + .await?; + } + IsolationLevel::RepeatableRead => { + tx.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ") + .await?; + } + IsolationLevel::Serializable => { + tx.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") + .await?; + } + } + + Ok(tx) + } } #[derive(Debug, Error)] @@ -249,24 +281,11 @@ mod test { .expect("cant convert to u256") .into(); - let eligibility_timestamp = Utc::now(); - - let hash = db - .insert_new_identity(commit_hash, eligibility_timestamp) - .await?; + let hash = db.insert_unprocessed_identity(commit_hash).await?; assert_eq!(commit_hash, hash); - let commit = db - .get_unprocessed_error(&commit_hash) - .await - .expect("expected commitment status"); - assert_eq!(commit, Some(None)); - - let identity_count = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await? - .len(); + let identity_count = db.get_unprocessed_commitments().await?.len(); assert_eq!(identity_count, 1); @@ -275,6 +294,41 @@ mod test { Ok(()) } + #[tokio::test] + async fn trim_unprocessed_identities() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + + let identities = mock_identities(10); + let roots = mock_roots(11); + + let eligibility_timestamp = Utc::now(); + + for identity in &identities { + db.insert_unprocessed_identity(*identity).await?; + } + + assert_eq!( + db.count_unprocessed_identities().await? as usize, + identities.len() + ); + + for (idx, identity) in identities.iter().copied().enumerate() { + println!("idx = {idx}"); + println!("roots[idx] = {}", roots[idx]); + println!("roots[idx + 1] = {}", roots[idx + 1]); + + db.insert_pending_identity(idx, &identity, &roots[idx + 1], &roots[idx]) + .await?; + } + + db.trim_unprocessed().await?; + + assert_eq!(db.count_unprocessed_identities().await?, 0); + + Ok(()) + } + #[tokio::test] async fn insert_and_delete_identity() -> anyhow::Result<()> { let docker = Cli::default(); @@ -322,7 +376,7 @@ mod test { } #[tokio::test] - async fn test_insert_prover_configuration() -> anyhow::Result<()> { + async fn insert_prover_configuration() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -365,7 +419,7 @@ mod test { } #[tokio::test] - async fn test_insert_provers() -> anyhow::Result<()> { + async fn insert_provers() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let mock_provers = mock_provers(); @@ -379,7 +433,7 @@ mod test { } #[tokio::test] - async fn test_remove_prover() -> anyhow::Result<()> { + async fn remove_prover() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let mock_provers = mock_provers(); @@ -397,27 +451,7 @@ mod test { } #[tokio::test] - async fn test_insert_new_recovery() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - - let existing_commitment: Uint<256, 4> = Uint::from(1); - let new_commitment: Uint<256, 4> = Uint::from(2); - - db.insert_new_recovery(&existing_commitment, &new_commitment) - .await?; - - let recoveries = db.get_all_recoveries().await?; - - assert_eq!(recoveries.len(), 1); - assert_eq!(recoveries[0].existing_commitment, existing_commitment); - assert_eq!(recoveries[0].new_commitment, new_commitment); - - Ok(()) - } - - #[tokio::test] - async fn test_insert_new_deletion() -> anyhow::Result<()> { + async fn insert_new_deletion() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let existing_commitment: Uint<256, 4> = Uint::from(1); @@ -433,132 +467,57 @@ mod test { } #[tokio::test] - async fn test_get_eligible_unprocessed_commitments() -> anyhow::Result<()> { + async fn get_eligible_unprocessed_commitments() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let commitment_0: Uint<256, 4> = Uint::from(1); let eligibility_timestamp_0 = Utc::now(); - db.insert_new_identity(commitment_0, eligibility_timestamp_0) - .await?; + db.insert_unprocessed_identity(commitment_0).await?; let commitment_1: Uint<256, 4> = Uint::from(2); let eligibility_timestamp_1 = Utc::now() .checked_add_days(Days::new(7)) .expect("Could not create eligibility timestamp"); - db.insert_new_identity(commitment_1, eligibility_timestamp_1) - .await?; + db.insert_unprocessed_identity(commitment_1).await?; - let unprocessed_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; + let unprocessed_commitments = db.get_unprocessed_commitments().await?; assert_eq!(unprocessed_commitments.len(), 1); - assert_eq!(unprocessed_commitments[0].commitment, commitment_0); - assert!( - unprocessed_commitments[0].eligibility_timestamp.timestamp() - - eligibility_timestamp_0.timestamp() - <= 1 - ); + assert_eq!(unprocessed_commitments[0], commitment_0); Ok(()) } #[tokio::test] - async fn test_get_unprocessed_commitments() -> anyhow::Result<()> { + async fn get_unprocessed_commitments() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; // Insert new identity with a valid eligibility timestamp let commitment_0: Uint<256, 4> = Uint::from(1); let eligibility_timestamp_0 = Utc::now(); - db.insert_new_identity(commitment_0, eligibility_timestamp_0) - .await?; + db.insert_unprocessed_identity(commitment_0).await?; // Insert new identity with eligibility timestamp in the future let commitment_1: Uint<256, 4> = Uint::from(2); let eligibility_timestamp_1 = Utc::now() .checked_add_days(Days::new(7)) .expect("Could not create eligibility timestamp"); - db.insert_new_identity(commitment_1, eligibility_timestamp_1) - .await?; + db.insert_unprocessed_identity(commitment_1).await?; - let unprocessed_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; + let unprocessed_commitments = db.get_unprocessed_commitments().await?; // Assert unprocessed commitments against expected values assert_eq!(unprocessed_commitments.len(), 1); - assert_eq!(unprocessed_commitments[0].commitment, commitment_0); - assert_eq!( - unprocessed_commitments[0].eligibility_timestamp.timestamp(), - eligibility_timestamp_0.timestamp() - ); - - Ok(()) - } - - #[tokio::test] - async fn test_identity_is_queued_for_deletion() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let existing_commitment: Uint<256, 4> = Uint::from(1); - - db.insert_new_deletion(0, &existing_commitment).await?; - - assert!( - db.identity_is_queued_for_deletion(&existing_commitment) - .await? - ); - - Ok(()) - } - - #[tokio::test] - async fn test_update_eligibility_timestamp() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let dec = "1234500000000000000"; - let commit_hash: Hash = U256::from_dec_str(dec) - .expect("cant convert to u256") - .into(); - - // Set eligibility to Utc::now() day and check db entries - let eligibility_timestamp = Utc::now(); - db.insert_new_identity(commit_hash, eligibility_timestamp) - .await?; - - let commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; - assert_eq!(commitments.len(), 1); - - let eligible_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; - assert_eq!(eligible_commitments.len(), 1); - - // Set eligibility to Utc::now() + 7 days and check db entries - let eligibility_timestamp = Utc::now() - .checked_add_days(Days::new(7)) - .expect("Could not create eligibility timestamp"); - - // Insert new identity with an eligibility timestamp in the future - let commit_hash: Hash = Hash::from(1); - db.insert_new_identity(commit_hash, eligibility_timestamp) - .await?; - - let eligible_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; - assert_eq!(eligible_commitments.len(), 1); + assert_eq!(unprocessed_commitments[0], commitment_0); Ok(()) } #[tokio::test] - async fn test_insert_deletion() -> anyhow::Result<()> { + async fn insert_deletion() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let identities = mock_identities(3); @@ -574,47 +533,6 @@ mod test { Ok(()) } - #[tokio::test] - async fn test_insert_recovery() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - - let old_identities = mock_identities(3); - let new_identities = mock_identities(3); - - for (old, new) in old_identities.into_iter().zip(new_identities) { - db.insert_new_recovery(&old, &new).await?; - } - - let recoveries = db.get_all_recoveries().await?; - assert_eq!(recoveries.len(), 3); - - Ok(()) - } - - #[tokio::test] - async fn test_delete_recoveries() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - - let old_identities = mock_identities(3); - let new_identities = mock_identities(3); - - for (old, new) in old_identities.clone().into_iter().zip(new_identities) { - db.insert_new_recovery(&old, &new).await?; - } - - let deleted_recoveries = db - .delete_recoveries(old_identities[0..2].iter().cloned()) - .await?; - assert_eq!(deleted_recoveries.len(), 2); - - let remaining = db.get_all_recoveries().await?; - assert_eq!(remaining.len(), 1); - - Ok(()) - } - #[tokio::test] async fn get_last_leaf_index() -> anyhow::Result<()> { let docker = Cli::default(); @@ -1020,7 +938,7 @@ mod test { } #[tokio::test] - async fn test_root_invalidation() -> anyhow::Result<()> { + async fn root_invalidation() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -1112,7 +1030,7 @@ mod test { // When there's only unprocessed identity let eligibility_timestamp = Utc::now(); - db.insert_new_identity(identities[0], eligibility_timestamp) + db.insert_unprocessed_identity(identities[0], eligibility_timestamp) .await .context("Inserting new identity")?; assert!(db.identity_exists(identities[0]).await?); @@ -1128,7 +1046,7 @@ mod test { } #[tokio::test] - async fn test_remove_deletions() -> anyhow::Result<()> { + async fn remove_deletions() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -1160,7 +1078,7 @@ mod test { } #[tokio::test] - async fn test_latest_insertion() -> anyhow::Result<()> { + async fn latest_insertion() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -1188,7 +1106,7 @@ mod test { } #[tokio::test] - async fn test_latest_deletion() -> anyhow::Result<()> { + async fn latest_deletion() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -1237,7 +1155,7 @@ mod test { } #[tokio::test] - async fn test_insert_batch() -> anyhow::Result<()> { + async fn insert_batch() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let identities: Vec<_> = mock_identities(10) @@ -1265,7 +1183,7 @@ mod test { } #[tokio::test] - async fn test_get_next_batch() -> anyhow::Result<()> { + async fn get_next_batch() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let identities: Vec<_> = mock_identities(10) @@ -1309,7 +1227,7 @@ mod test { } #[tokio::test] - async fn test_get_next_batch_without_transaction() -> anyhow::Result<()> { + async fn get_next_batch_without_transaction() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let identities: Vec<_> = mock_identities(10) @@ -1357,7 +1275,7 @@ mod test { } #[tokio::test] - async fn test_get_batch_head() -> anyhow::Result<()> { + async fn get_batch_head() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let roots = mock_roots(1); @@ -1388,7 +1306,7 @@ mod test { } #[tokio::test] - async fn test_insert_transaction() -> anyhow::Result<()> { + async fn insert_transaction() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let roots = mock_roots(1); diff --git a/src/database/types.rs b/src/database/types.rs index c53d6b93..cfca25a7 100644 --- a/src/database/types.rs +++ b/src/database/types.rs @@ -5,18 +5,9 @@ use sqlx::error::BoxDynError; use sqlx::prelude::FromRow; use sqlx::{Database, Decode, Encode, Postgres, Type}; -use crate::identity_tree::{Hash, UnprocessedStatus}; +use crate::identity_tree::Hash; use crate::prover::identity::Identity; -pub struct UnprocessedCommitment { - pub commitment: Hash, - pub status: UnprocessedStatus, - pub created_at: DateTime, - pub processed_at: Option>, - pub error_message: Option, - pub eligibility_timestamp: DateTime, -} - #[derive(FromRow)] pub struct RecoveryEntry { // existing commitment is used in tests only, but recoveries in general diff --git a/src/identity/processor.rs b/src/identity/processor.rs index 21618db8..2d8821ef 100644 --- a/src/identity/processor.rs +++ b/src/identity/processor.rs @@ -18,13 +18,12 @@ use crate::contracts::scanner::BlockScanner; use crate::contracts::IdentityManager; use crate::database::methods::DbMethods; use crate::database::types::{BatchEntry, BatchType}; -use crate::database::{Database, Error}; +use crate::database::{Database, IsolationLevel}; use crate::ethereum::{Ethereum, ReadProvider}; use crate::identity_tree::{Canonical, Hash, TreeVersion, TreeWithNextVersion}; use crate::prover::identity::Identity; use crate::prover::repository::ProverRepository; use crate::prover::Prover; -use crate::retry_tx; use crate::utils::index_packing::pack_indices; pub type TransactionId = String; @@ -135,31 +134,32 @@ impl IdentityProcessor for OnChainIdentityProcessor { let root_hash = self.identity_manager.latest_root().await?; let root_hash = root_hash.into(); + // it's enough to run with read committed here + // since in the worst case another instance of the sequencer + // will try to do the same thing but with a later root + // in such a case the state will be corrected later in the program + let mut tx = self + .database + .begin_tx(IsolationLevel::ReadCommitted) + .await?; + // We don't store the initial root in the database, so we have to skip this step // if the contract root hash is equal to initial root hash if root_hash != *initial_root_hash { // Note that we don't have a way of queuing a root here for // finalization. so it's going to stay as "processed" // until the next root is mined. self.database. - retry_tx!(self.database.pool, tx, { - tx.mark_root_as_processed(&root_hash).await?; - tx.delete_batches_after_root(&root_hash).await?; - - Result::<(), Error>::Ok(()) - }) - .await?; + tx.mark_root_as_processed(&root_hash).await?; + tx.delete_batches_after_root(&root_hash).await?; // TODO: We probably shouldn't do this in HA } else { // Db is either empty or we're restarting with a new contract/chain // so we should mark everything as pending - retry_tx!(self.database.pool, tx, { - tx.mark_all_as_pending().await?; - tx.delete_all_batches().await?; - - Result::<(), Error>::Ok(()) - }) - .await?; + tx.mark_all_as_pending().await?; + tx.delete_all_batches().await?; // TODO: We probably shouldn't do this in HA } + tx.commit().await?; + Ok(()) } } @@ -425,12 +425,12 @@ impl OnChainIdentityProcessor { continue; } - retry_tx!( - self.database.pool, - tx, - tx.mark_root_as_processed(&post_root.into()).await - ) - .await?; + let mut tx = self + .database + .begin_tx(IsolationLevel::ReadCommitted) + .await?; + tx.mark_root_as_processed(&post_root.into()).await?; + tx.commit().await?; info!(?pre_root, ?post_root, ?kind, "Batch mined"); @@ -464,12 +464,12 @@ impl OnChainIdentityProcessor { continue; } - retry_tx!( - self.database.pool, - tx, - tx.mark_root_as_mined(&root.into()).await - ) - .await?; + let mut tx = self + .database + .begin_tx(IsolationLevel::ReadCommitted) + .await?; + tx.mark_root_as_processed(&root.into()).await?; + tx.commit().await?; info!(?root, "Root finalized"); } @@ -515,48 +515,52 @@ impl OnChainIdentityProcessor { log: &Log, max_epoch_duration: Duration, ) -> anyhow::Result<()> { - retry_tx!(self.database.pool, tx, { - let tx_hash = log.transaction_hash.context("Missing tx hash")?; - let commitments = self - .identity_manager - .fetch_deletion_indices_from_tx(tx_hash) - .await - .context("Could not fetch deletion indices from tx")?; + let tx_hash = log.transaction_hash.context("Missing tx hash")?; + let commitments = self + .identity_manager + .fetch_deletion_indices_from_tx(tx_hash) + .await + .context("Could not fetch deletion indices from tx")?; - let commitments = self - .database - .get_non_zero_commitments_by_leaf_indexes(commitments.iter().copied()) - .await?; - let commitments: Vec = commitments.into_iter().map(Into::into).collect(); + let commitments = self + .database + .get_non_zero_commitments_by_leaf_indexes(commitments.iter().copied()) + .await?; + let commitments: Vec = commitments.into_iter().map(Into::into).collect(); - // Fetch the root history expiry time on chain - let root_history_expiry = self.identity_manager.root_history_expiry().await?; + // Fetch the root history expiry time on chain + let root_history_expiry = self.identity_manager.root_history_expiry().await?; - // Use the root history expiry to calculate the eligibility timestamp for the - // new insertion - let root_history_expiry_duration = - chrono::Duration::seconds(root_history_expiry.as_u64() as i64); - let max_epoch_duration = chrono::Duration::from_std(max_epoch_duration)?; + // Use the root history expiry to calculate the eligibility timestamp for the + // new insertion + let root_history_expiry_duration = + chrono::Duration::seconds(root_history_expiry.as_u64() as i64); + let max_epoch_duration = chrono::Duration::from_std(max_epoch_duration)?; - let delay = root_history_expiry_duration + max_epoch_duration; + let delay = root_history_expiry_duration + max_epoch_duration; - let eligibility_timestamp = Utc::now() + delay; + let eligibility_timestamp = Utc::now() + delay; - // Check if any deleted commitments correspond with entries in the - // recoveries table and insert the new commitment into the unprocessed - // identities table with the proper eligibility timestamp - let deleted_recoveries = tx.delete_recoveries(commitments).await?; + let mut tx = self + .database + .begin_tx(IsolationLevel::ReadCommitted) + .await?; - // For each deletion, if there is a corresponding recovery, insert a new - // identity with the specified eligibility timestamp - for recovery in deleted_recoveries { - tx.insert_new_identity(recovery.new_commitment, eligibility_timestamp) - .await?; - } + // Check if any deleted commitments correspond with entries in the + // recoveries table and insert the new commitment into the unprocessed + // identities table with the proper eligibility timestamp + let deleted_recoveries = tx.delete_recoveries(commitments).await?; - Result::<_, anyhow::Error>::Ok(()) - }) - .await + // For each deletion, if there is a corresponding recovery, insert a new + // identity with the specified eligibility timestamp + for recovery in deleted_recoveries { + tx.insert_unprocessed_identity(recovery.new_commitment, eligibility_timestamp) + .await?; + } + + tx.commit().await?; + + Ok(()) } } @@ -588,15 +592,15 @@ impl IdentityProcessor for OffChainIdentityProcessor { self.update_eligible_recoveries(batch).await?; } - retry_tx!(self.database.pool, tx, { - // With current flow it is required to mark root as processed first as this is - // how required mined_at field is set - tx.mark_root_as_processed(&batch.next_root).await?; - tx.mark_root_as_mined(&batch.next_root).await?; + let mut tx = self + .database + .begin_tx(IsolationLevel::ReadCommitted) + .await?; - Result::<_, anyhow::Error>::Ok(()) - }) - .await?; + tx.mark_root_as_processed(&batch.next_root).await?; + tx.mark_root_as_mined(&batch.next_root).await?; + + tx.commit().await?; processed_tree.apply_updates_up_to(batch.next_root); } @@ -636,25 +640,28 @@ impl OffChainIdentityProcessor { } async fn update_eligible_recoveries(&self, batch: &BatchEntry) -> anyhow::Result<()> { - retry_tx!(self.database.pool, tx, { - let commitments: Vec = - batch.data.identities.iter().map(|v| v.commitment).collect(); - let eligibility_timestamp = Utc::now(); - - // Check if any deleted commitments correspond with entries in the - // recoveries table and insert the new commitment into the unprocessed - // identities table with the proper eligibility timestamp - let deleted_recoveries = tx.delete_recoveries(commitments).await?; - - // For each deletion, if there is a corresponding recovery, insert a new - // identity with the specified eligibility timestamp - for recovery in deleted_recoveries { - tx.insert_new_identity(recovery.new_commitment, eligibility_timestamp) - .await?; - } + let commitments: Vec = batch.data.identities.iter().map(|v| v.commitment).collect(); + let eligibility_timestamp = Utc::now(); - Result::<_, anyhow::Error>::Ok(()) - }) - .await + let mut tx = self + .database + .begin_tx(IsolationLevel::ReadCommitted) + .await?; + + // Check if any deleted commitments correspond with entries in the + // recoveries table and insert the new commitment into the unprocessed + // identities table with the proper eligibility timestamp + let deleted_recoveries = tx.delete_recoveries(commitments).await?; + + // For each deletion, if there is a corresponding recovery, insert a new + // identity with the specified eligibility timestamp + for recovery in deleted_recoveries { + tx.insert_unprocessed_identity(recovery.new_commitment, eligibility_timestamp) + .await?; + } + + tx.commit().await?; + + Ok(()) } } diff --git a/src/identity_tree/initializer.rs b/src/identity_tree/initializer.rs index 3200af82..08b22915 100644 --- a/src/identity_tree/initializer.rs +++ b/src/identity_tree/initializer.rs @@ -9,7 +9,7 @@ use crate::database::methods::DbMethods; use crate::database::Database; use crate::identity::processor::IdentityProcessor; use crate::identity_tree::{ - CanonicalTreeBuilder, Hash, ProcessedStatus, TreeState, TreeUpdate, TreeVersionReadOps, + CanonicalTreeBuilder, Hash, ProcessedStatus, TreeState, TreeUpdate, TreeVersionOps, TreeWithNextVersion, }; use crate::utils::tree_updates::dedup_tree_updates; diff --git a/src/identity_tree/mod.rs b/src/identity_tree/mod.rs index cd8bed97..03c6ed00 100644 --- a/src/identity_tree/mod.rs +++ b/src/identity_tree/mod.rs @@ -105,10 +105,14 @@ struct TreeVersionData { } /// Basic operations that should be available for all tree versions. -trait BasicTreeOps { +pub trait BasicTreeOps { /// Updates the tree with the given element at the given leaf index. fn update(&mut self, leaf_index: usize, element: Hash); + fn next_leaf(&self) -> usize; + fn proof(&self, leaf_index: usize) -> (Hash, Proof); + fn root(&self) -> Hash; + fn apply_diffs(&mut self, diffs: Vec); /// Notifies the tree that it was changed and can perform garbage @@ -217,6 +221,19 @@ impl BasicTreeOps for TreeVersionData { self.metadata.count_since_last_flatten += 1; } + fn next_leaf(&self) -> usize { + self.next_leaf + } + + fn proof(&self, leaf_index: usize) -> (Hash, Proof) { + let proof = self.tree.proof(leaf_index); + (self.tree.root(), proof) + } + + fn root(&self) -> Hash { + self.tree.root() + } + fn apply_diffs(&mut self, diffs: Vec) { for applied_update in &diffs { let update = &applied_update.update; @@ -277,6 +294,19 @@ impl BasicTreeOps for TreeVersionData { }); } + fn next_leaf(&self) -> usize { + self.next_leaf + } + + fn proof(&self, leaf_index: usize) -> (Hash, Proof) { + let proof = self.tree.proof(leaf_index); + (self.tree.root(), proof) + } + + fn root(&self) -> Hash { + self.tree.root() + } + fn apply_diffs(&mut self, mut diffs: Vec) { let last = diffs.last().cloned(); @@ -368,7 +398,9 @@ impl> TreeVersion { /// The public-facing API for reading from a tree version. It is implemented for /// all versions. This being a trait allows us to hide some of the /// implementation details. -pub trait TreeVersionReadOps { +pub trait TreeVersionOps { + fn update(&self, leaf_index: usize, element: Hash); + /// Returns the current tree root. fn get_root(&self) -> Hash; /// Returns the next free leaf. @@ -381,10 +413,14 @@ pub trait TreeVersionReadOps { fn get_leaf(&self, leaf: usize) -> Hash; } -impl TreeVersionReadOps for TreeVersion +impl TreeVersionOps for TreeVersion where TreeVersionData: BasicTreeOps, { + fn update(&self, leaf_index: usize, element: Hash) { + self.get_data().update(leaf_index, element); + } + fn get_root(&self) -> Hash { self.get_data().get_root() } @@ -441,6 +477,8 @@ impl TreeVersion { output } + // pub fn append(&self, identity: Hash) + /// Deletes many identities from the tree, returns a list with the root /// and proof of inclusion #[must_use] diff --git a/src/server/mod.rs b/src/server/mod.rs index cf11ffc8..eae84816 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -24,9 +24,8 @@ pub mod data; use self::data::{ AddBatchSizeRequest, DeletionRequest, InclusionProofRequest, InclusionProofResponse, - InsertCommitmentRequest, ListBatchSizesResponse, RecoveryRequest, RemoveBatchSizeRequest, - ToResponseCode, VerifySemaphoreProofQuery, VerifySemaphoreProofRequest, - VerifySemaphoreProofResponse, + InsertCommitmentRequest, ListBatchSizesResponse, RemoveBatchSizeRequest, ToResponseCode, + VerifySemaphoreProofQuery, VerifySemaphoreProofRequest, VerifySemaphoreProofResponse, }; async fn inclusion_proof( @@ -84,20 +83,7 @@ async fn delete_identity( State(app): State>, Json(req): Json, ) -> Result<(), Error> { - app.delete_identity_tx(&req.identity_commitment).await?; - Ok(()) -} - -async fn recover_identity( - State(app): State>, - Json(req): Json, -) -> Result<(), Error> { - app.recover_identity( - &req.previous_identity_commitment, - &req.new_identity_commitment, - ) - .await?; - + app.delete_identity(&req.identity_commitment).await?; Ok(()) } @@ -174,7 +160,6 @@ pub async fn bind_from_listener( .route("/inclusionProof", post(inclusion_proof)) .route("/insertIdentity", post(insert_identity)) .route("/deleteIdentity", post(delete_identity)) - .route("/recoverIdentity", post(recover_identity)) // Operate on batch sizes .route("/addBatchSize", post(add_batch_size)) .route("/removeBatchSize", post(remove_batch_size)) diff --git a/src/task_monitor/tasks/create_batches.rs b/src/task_monitor/tasks/create_batches.rs index 4674f2b6..ef009c2c 100644 --- a/src/task_monitor/tasks/create_batches.rs +++ b/src/task_monitor/tasks/create_batches.rs @@ -15,7 +15,7 @@ use crate::database; use crate::database::methods::DbMethods as _; use crate::database::Database; use crate::identity_tree::{ - AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, + AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionOps, TreeWithNextVersion, }; use crate::prover::identity::Identity; use crate::prover::repository::ProverRepository; diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index 7db83a75..a1992078 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -11,7 +11,7 @@ use tracing::info; use crate::app::App; use crate::database::methods::DbMethods; use crate::database::types::DeletionEntry; -use crate::identity_tree::{Hash, TreeVersionReadOps}; +use crate::identity_tree::{Hash, TreeVersionOps}; // Deletion here differs from insert_identites task. This is because two // different flows are created for both tasks. Due to how our prover works diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index fafa275e..9674b84b 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -1,16 +1,14 @@ use std::sync::Arc; use std::time::Duration; -use sqlx::{Postgres, Transaction}; use tokio::sync::{Mutex, Notify}; use tokio::time; use tracing::info; use crate::app::App; use crate::database::methods::DbMethods as _; -use crate::database::types::UnprocessedCommitment; -use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; -use crate::retry_tx; +use crate::database::IsolationLevel; +use crate::identity_tree::{TreeVersionOps, UnprocessedStatus}; // Insertion here differs from delete_identities task. This is because two // different flows are created for both tasks. We need to insert identities as @@ -33,8 +31,9 @@ pub async fn insert_identities( // get commits from database let unprocessed = app .database - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) + .get_unprocessed_commitments(UnprocessedStatus::New) .await?; + if unprocessed.is_empty() { continue; } @@ -42,62 +41,39 @@ pub async fn insert_identities( let _guard = pending_insertions_mutex.lock().await; let latest_tree = app.tree_state()?.latest_tree(); - retry_tx!(&app.database, tx, { - insert_identities_batch(&mut tx, latest_tree, &unprocessed).await - }) - .await?; + let mut tx = app.database.begin_tx(IsolationLevel::ReadCommitted).await?; - // Notify the identity processing task, that there are new identities - wake_up_notify.notify_one(); - } -} + let next_leaf = latest_tree.next_leaf(); + let mut pre_root = latest_tree.get_root(); -pub async fn insert_identities_batch( - tx: &mut Transaction<'_, Postgres>, - latest_tree: &TreeVersion, - identities: &[UnprocessedCommitment], -) -> Result<(), anyhow::Error> { - // Filter out any identities that are already in the `identities` table - let mut filtered_identities = vec![]; - for identity in identities { - if tx - .get_identity_leaf_index(&identity.commitment) - .await? - .is_some() - { - tracing::warn!(?identity.commitment, "Duplicate identity"); - tx.remove_unprocessed_identity(&identity.commitment).await?; - } else { - filtered_identities.push(identity.commitment); - } - } + let next_db_index = tx.get_next_leaf_index().await?; + assert_eq!( + next_leaf, next_db_index, + "Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: {next_db_index}" + ); - let next_leaf = latest_tree.next_leaf(); + for (idx, identity) in unprocessed.iter().enumerate() { + let leaf_idx = next_leaf + idx; + latest_tree.update(leaf_idx, *identity); + let root = latest_tree.get_root(); - let next_db_index = tx.get_next_leaf_index().await?; + tx.insert_pending_identity(leaf_idx, identity, &root, &pre_root) + .await + .expect("Failed to insert identity - tree will be out of sync"); - assert_eq!( - next_leaf, next_db_index, - "Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \ - {next_db_index}" - ); - - let mut pre_root = &latest_tree.get_root(); - let data = latest_tree.append_many(&filtered_identities); + pre_root = root; + } - assert_eq!( - data.len(), - filtered_identities.len(), - "Length mismatch when appending identities to tree" - ); + tx.trim_unprocessed().await?; - for ((root, _proof, leaf_index), identity) in data.iter().zip(&filtered_identities) { - tx.insert_pending_identity(*leaf_index, identity, root, pre_root) - .await?; - pre_root = root; + // TODO: This works only while we're not operating in an HA context + // when HA is introduced we need to increase the tx serialization level + // otherwise we'll face too many crashes + tx.commit() + .await + .expect("Committing insert failed - tree will be out of sync"); - tx.remove_unprocessed_identity(identity).await?; + // Notify the identity processing task, that there are new identities + wake_up_notify.notify_one(); } - - Ok(()) } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index e44f6dc6..18835722 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -40,7 +40,7 @@ pub mod prelude { AppConfig, Config, DatabaseConfig, OzDefenderConfig, ProvidersConfig, RelayerConfig, ServerConfig, TreeConfig, TxSitterConfig, }; - pub use signup_sequencer::identity_tree::{Hash, TreeVersionReadOps}; + pub use signup_sequencer::identity_tree::{Hash, TreeVersionOps}; pub use signup_sequencer::prover::ProverType; pub use signup_sequencer::server; pub use signup_sequencer::shutdown::Shutdown; @@ -76,7 +76,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use reqwest::{Body, Client, Method, Request, RequestBuilder, StatusCode}; use semaphore::poseidon_tree::Proof; -use signup_sequencer::identity_tree::{InclusionProof, TreeState, TreeVersionReadOps}; +use signup_sequencer::identity_tree::{InclusionProof, TreeState, TreeVersionOps}; use signup_sequencer::server::data::{ AddBatchSizeRequest, DeletionRequest, InclusionProofRequest, InclusionProofResponse, InsertCommitmentRequest, RecoveryRequest, RemoveBatchSizeRequest, VerifySemaphoreProofRequest, @@ -479,56 +479,6 @@ pub async fn test_delete_identity( (ref_tree.proof(leaf_index).unwrap(), ref_tree.root()) } -#[instrument(skip_all)] -pub async fn test_recover_identity( - uri: &str, - client: &Client, - ref_tree: &mut PoseidonTree, - test_leaves: &[Field], - previous_leaf_index: usize, - new_leaf: Field, - new_leaf_index: usize, - expect_failure: bool, -) -> (merkle_tree::Proof, Field) { - let previous_leaf = test_leaves[previous_leaf_index]; - - let body = construct_recover_identity_body(&previous_leaf, &new_leaf); - - let response = client - .post(uri.to_owned() + "/recoverIdentity") - .header("Content-Type", "application/json") - .body(body) - .send() - .await - .expect("Failed to create insert identity"); - - let response_status = response.status(); - - let bytes = response - .bytes() - .await - .expect("Failed to get response bytes"); - - if expect_failure { - assert!(!response_status.is_success()); - } else { - assert!(response_status.is_success()); - assert!(bytes.is_empty()); - } - - // TODO: Note that recovery order is non-deterministic and therefore we cannot - // easily keep the ref_tree in sync with the sequencer's version of the - // tree. In the future, we could consider tracking updates to the tree in a - // different way like listening to event emission. - ref_tree.set(previous_leaf_index, Hash::ZERO); - // Continuing on the note above, while the replacement identity is be - // inserted as a new identity, it is not deterministic and if there are multiple - // recovery requests, it is possible that the sequencer tree is ordered in a - // different way than the ref_tree - ref_tree.set(new_leaf_index, new_leaf); - (ref_tree.proof(new_leaf_index).unwrap(), ref_tree.root()) -} - #[instrument(skip_all)] pub async fn test_add_batch_size( uri: impl Into, diff --git a/tests/recover_identities.rs b/tests/recover_identities.rs deleted file mode 100644 index c492af4e..00000000 --- a/tests/recover_identities.rs +++ /dev/null @@ -1,211 +0,0 @@ -#![allow(clippy::needless_range_loop)] - -mod common; - -use common::prelude::*; - -use crate::common::{test_in_tree, test_not_in_tree, test_recover_identity}; - -const IDLE_TIME: u64 = 7; - -#[tokio::test] -async fn recover_identities_onchain() -> anyhow::Result<()> { - recover_identities(false).await -} - -async fn recover_identities(offchain_mode_enabled: bool) -> anyhow::Result<()> { - // Initialize logging for the test. - init_tracing_subscriber(); - info!("Starting integration test"); - - let insertion_batch_size: usize = 8; - let deletion_batch_size: usize = 3; - - let mut ref_tree = PoseidonTree::new(DEFAULT_TREE_DEPTH + 1, ruint::Uint::ZERO); - let initial_root: U256 = ref_tree.root().into(); - - let docker = Cli::default(); - let (mock_chain, db_container, insertion_prover_map, deletion_prover_map, micro_oz) = - spawn_deps( - initial_root, - &[insertion_batch_size], - &[deletion_batch_size], - DEFAULT_TREE_DEPTH as u8, - &docker, - ) - .await?; - - // Set the root history expirty to 15 seconds - let updated_root_history_expiry = U256::from(30); - mock_chain - .identity_manager - .set_root_history_expiry(updated_root_history_expiry) - .send() - .await? - .await?; - - let mock_insertion_prover = &insertion_prover_map[&insertion_batch_size]; - let mock_deletion_prover = &deletion_prover_map[&deletion_batch_size]; - - let db_socket_addr = db_container.address(); - let db_url = format!("postgres://postgres:postgres@{db_socket_addr}/database"); - - let temp_dir = tempfile::tempdir()?; - info!( - "temp dir created at: {:?}", - temp_dir.path().join("testfile") - ); - - let config = TestConfigBuilder::new() - .db_url(&db_url) - .oz_api_url(µ_oz.endpoint()) - .oz_address(micro_oz.address()) - .min_batch_deletion_size(deletion_batch_size) - .identity_manager_address(mock_chain.identity_manager.address()) - .primary_network_provider(mock_chain.anvil.endpoint()) - .cache_file(temp_dir.path().join("testfile").to_str().unwrap()) - .add_prover(mock_insertion_prover) - .add_prover(mock_deletion_prover) - .offchain_mode(offchain_mode_enabled) - .build()?; - - let (_, app_handle, local_addr, shutdown) = - spawn_app(config).await.expect("Failed to spawn app."); - - let test_identities = generate_test_identities(insertion_batch_size * 3); - let identities_ref: Vec = test_identities - .iter() - .map(|i| Hash::from_str_radix(i, 16).unwrap()) - .collect(); - - let uri = "http://".to_owned() + &local_addr.to_string(); - let client = Client::new(); - - let mut next_leaf_index = 0; - // Insert enough identities to trigger an batch to be sent to the blockchain. - for _ in 0..insertion_batch_size { - test_insert_identity( - &uri, - &client, - &mut ref_tree, - &identities_ref, - next_leaf_index, - ) - .await; - - next_leaf_index += 1; - } - - tokio::time::sleep(Duration::from_secs(IDLE_TIME)).await; - // Check that we can also get these inclusion proofs back. - for i in 0..insertion_batch_size { - test_inclusion_proof( - &mock_chain, - &uri, - &client, - i, - &ref_tree, - &identities_ref[i], - false, - offchain_mode_enabled, - ) - .await; - } - - // Test that we cannot recover with an identity that has previously been - // inserted - test_recover_identity( - &uri, - &client, - &mut ref_tree, - &identities_ref, - // Last inserted identity - insertion_batch_size - 1, - // Second to last inserted identity as recovery - identities_ref[insertion_batch_size - 2], - next_leaf_index, - true, - ) - .await; - - // Insert enough recoveries to trigger a batch - for i in 0..deletion_batch_size { - // Delete the identity at i and replace it with an identity at the back of the - // test identities array - // TODO: we should update to a much cleaner approach - - test_recover_identity( - &uri, - &client, - &mut ref_tree, - &identities_ref, - i, - identities_ref[next_leaf_index], - next_leaf_index, - false, - ) - .await; - - next_leaf_index += 1; - } - - tokio::time::sleep(Duration::from_secs(IDLE_TIME * 3)).await; - - // Ensure that identities have been deleted - for i in 0..deletion_batch_size { - test_inclusion_proof( - &mock_chain, - &uri, - &client, - i, - &ref_tree, - &identities_ref[i], - true, - offchain_mode_enabled, - ) - .await; - - // Check that the replacement identity has not been inserted yet - let recovery_leaf_index = insertion_batch_size + i; - test_not_in_tree(&uri, &client, &identities_ref[recovery_leaf_index]).await; - } - - // Sleep for root expiry - tokio::time::sleep(Duration::from_secs(updated_root_history_expiry.as_u64())).await; - - // Insert enough identities to trigger a batch to be sent to the blockchain. - for _ in 0..insertion_batch_size { - test_insert_identity( - &uri, - &client, - &mut ref_tree, - &identities_ref, - next_leaf_index, - ) - .await; - next_leaf_index += 1; - } - - tokio::time::sleep(Duration::from_secs(IDLE_TIME * 8)).await; - - // Check that the replacement identities have been inserted - for i in 0..deletion_batch_size { - let recovery_leaf_index = insertion_batch_size + i; - - // Check that the replacement identity has a mined status after an insertion - // batch has completed - test_in_tree(&uri, &client, &identities_ref[recovery_leaf_index]).await; - } - - // Shutdown the app properly for the final time - shutdown.shutdown(); - app_handle.await.unwrap(); - for (_, prover) in insertion_prover_map.into_iter() { - prover.stop(); - } - for (_, prover) in deletion_prover_map.into_iter() { - prover.stop(); - } - - Ok(()) -} diff --git a/tests/unreduced_identity.rs b/tests/unreduced_identity.rs index 7303e6b6..5f58e4eb 100644 --- a/tests/unreduced_identity.rs +++ b/tests/unreduced_identity.rs @@ -74,24 +74,6 @@ async fn test_unreduced_identity(offchain_mode_enabled: bool) -> anyhow::Result< body_str ); - // Test unreduced identity for recovery - let body = common::construct_recover_identity_body(&Hash::ZERO, &ruint::Uint::<256, 4>::MAX); - let response = client - .post(uri.to_owned() + "/recoverIdentity") - .header("Content-Type", "application/json") - .body(body) - .send() - .await - .expect("Failed to create insert identity"); - - let bytes = response.bytes().await.expect("Failed to read body bytes"); - let body_str = String::from_utf8_lossy(&bytes); - - assert_eq!( - "provided identity commitment is not in reduced form", - body_str - ); - shutdown.shutdown(); app_handle.await?; for (_, prover) in insertion_prover_map.into_iter() {