diff --git a/Cargo.toml b/Cargo.toml index 97f2aee3..22c6946a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,7 +93,7 @@ semaphore = { git = "https://github.com/worldcoin/semaphore-rs", rev = "5170c422 similar-asserts = "1.5.0" test-case = "3.0" testcontainers = "0.15.0" -testcontainers-modules = { version = "0.3.7", features = ["postgres"] } +testcontainers-modules = { version = "0.3.7", features = ["postgres"] } tracing-subscriber = "0.3.11" tracing-test = "0.2" diff --git a/src/app.rs b/src/app.rs index be1fcd15..6dcec2c6 100644 --- a/src/app.rs +++ b/src/app.rs @@ -11,7 +11,8 @@ use tracing::{info, instrument, warn}; use crate::config::Config; use crate::contracts::{IdentityManager, SharedIdentityManager}; -use crate::database::{Database, DatabaseExt as _}; +use crate::database::query::DatabaseQuery as _; +use crate::database::Database; use crate::ethereum::Ethereum; use crate::identity_tree::{ CanonicalTreeBuilder, Hash, InclusionProof, ProcessedStatus, RootItem, Status, TreeState, @@ -25,6 +26,7 @@ use crate::server::data::{ VerifySemaphoreProofRequest, VerifySemaphoreProofResponse, }; use crate::server::error::Error as ServerError; +use crate::utils::retry_tx; use crate::utils::tree_updates::dedup_tree_updates; pub struct App { @@ -111,7 +113,7 @@ impl App { if root_hash != initial_root_hash { // Note that we don't have a way of queuing a root here for finalization. // so it's going to stay as "processed" until the next root is mined. - self.database.mark_root_as_processed(&root_hash).await?; + self.database.mark_root_as_processed_tx(&root_hash).await?; } else { // Db is either empty or we're restarting with a new contract/chain // so we should mark everything as pending @@ -401,10 +403,11 @@ impl App { Ok(()) } - pub async fn delete_identity(&self, commitment: &Hash) -> Result<(), ServerError> { - let mut tx = self.database.begin().await?; - self.delete_identity_tx(&mut tx, commitment).await?; - tx.commit().await?; + pub async fn delete_identity_tx(&self, commitment: &Hash) -> Result<(), ServerError> { + retry_tx!(self.database.pool, tx, { + self.delete_identity(&mut tx, commitment).await + }) + .await?; Ok(()) } @@ -415,7 +418,7 @@ impl App { /// Will return `Err` if identity is already queued, not in the tree, or the /// queue malfunctions. #[instrument(level = "debug", skip(self, tx))] - pub async fn delete_identity_tx( + pub async fn delete_identity( &self, tx: &mut Transaction<'_, Postgres>, commitment: &Hash, @@ -477,46 +480,44 @@ impl App { existing_commitment: &Hash, new_commitment: &Hash, ) -> Result<(), ServerError> { - if *new_commitment == self.identity_manager.initial_leaf_value() { - warn!( - ?new_commitment, - "Attempt to insert initial leaf in recovery." - ); - return Err(ServerError::InvalidCommitment); - } - - if !self.identity_manager.has_insertion_provers().await { - warn!( - ?new_commitment, - "Identity Manager has no provers. Add provers with /addBatchSize request." - ); - return Err(ServerError::NoProversOnIdInsert); - } - - if !self.identity_is_reduced(*new_commitment) { - warn!( - ?new_commitment, - "The new identity commitment is not reduced." - ); - return Err(ServerError::UnreducedCommitment); - } + retry_tx!(self.database.pool, tx, { + if *new_commitment == self.identity_manager.initial_leaf_value() { + warn!( + ?new_commitment, + "Attempt to insert initial leaf in recovery." + ); + return Err(ServerError::InvalidCommitment); + } - let mut tx = self.database.begin().await?; + if !self.identity_manager.has_insertion_provers().await { + warn!( + ?new_commitment, + "Identity Manager has no provers. Add provers with /addBatchSize request." + ); + return Err(ServerError::NoProversOnIdInsert); + } - if tx.identity_exists(*new_commitment).await? { - return Err(ServerError::DuplicateCommitment); - } + if !self.identity_is_reduced(*new_commitment) { + warn!( + ?new_commitment, + "The new identity commitment is not reduced." + ); + return Err(ServerError::UnreducedCommitment); + } - // Delete the existing id and insert the commitments into the recovery table - self.delete_identity_tx(&mut tx, existing_commitment) - .await?; + if tx.identity_exists(*new_commitment).await? { + return Err(ServerError::DuplicateCommitment); + } - tx.insert_new_recovery(existing_commitment, new_commitment) - .await?; + // Delete the existing id and insert the commitments into the recovery table + self.delete_identity(&mut tx, existing_commitment).await?; - tx.commit().await?; + tx.insert_new_recovery(existing_commitment, new_commitment) + .await?; - Ok(()) + Ok(()) + }) + .await } pub async fn identity_history( diff --git a/src/database/mod.rs b/src/database/mod.rs index 81cf9a9c..946d4531 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -5,32 +5,26 @@ )] use std::cmp::Ordering; -use std::collections::HashSet; use std::ops::Deref; use anyhow::{anyhow, Context, Error as ErrReport}; -use chrono::{DateTime, Utc}; -use ruint::aliases::U256; use sqlx::migrate::{Migrate, MigrateDatabase, Migrator}; use sqlx::pool::PoolOptions; use sqlx::{Executor, Pool, Postgres, Row}; use thiserror::Error; use tracing::{error, info, instrument, warn}; -use self::types::{CommitmentHistoryEntry, DeletionEntry, LatestDeletionEntry, RecoveryEntry}; use crate::config::DatabaseConfig; -use crate::identity_tree::{ - Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, -}; -use crate::prover::{ProverConfig, ProverType}; +use crate::database::query::DatabaseQuery; +use crate::identity_tree::Hash; +pub mod query; +pub mod transaction; pub mod types; // Statically link in migration files static MIGRATOR: Migrator = sqlx::migrate!("schemas/database"); -const MAX_UNPROCESSED_FETCH_COUNT: i64 = 10_000; - pub struct Database { pub pool: Pool, } @@ -43,7 +37,7 @@ impl Deref for Database { } } -impl<'a, T> DatabaseExt<'a> for T where T: Executor<'a, Database = Postgres> {} +impl<'a, T> DatabaseQuery<'a> for T where T: Executor<'a, Database = Postgres> {} impl Database { #[instrument(skip_all)] @@ -142,704 +136,6 @@ impl Database { Ok(Self { pool }) } - - /// Marks the identities and roots from before a given root hash as mined - /// Also marks following roots as pending - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_processed(&self, root: &Hash) -> Result<(), Error> { - let mut tx = self.pool.begin().await?; - - let root_id = tx.get_id_by_root(root).await?; - - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; - - let root_id = root_id as i64; - // TODO: Can I get rid of line `AND status <> $2 - let update_previous_roots = sqlx::query( - r#" - UPDATE identities - SET status = $2, mined_at = CURRENT_TIMESTAMP - WHERE id <= $1 - AND status <> $2 - AND status <> $3; - "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Processed)) - .bind(<&str>::from(ProcessedStatus::Mined)); - - let update_next_roots = sqlx::query( - r#" - UPDATE identities - SET status = $2, mined_at = NULL - WHERE id > $1 - "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Pending)); - - tx.execute(update_previous_roots).await?; - tx.execute(update_next_roots).await?; - - tx.commit().await?; - - Ok(()) - } - - /// Marks the identities and roots from before a given root hash as - /// finalized - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_mined(&self, root: &Hash) -> Result<(), Error> { - let mined_status = ProcessedStatus::Mined; - - let mut tx = self.pool.begin().await?; - tx.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;") - .await?; - - let root_id = tx.get_id_by_root(root).await?; - - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; - - let root_id = root_id as i64; - - let update_previous_roots = sqlx::query( - r#" - UPDATE identities - SET status = $2 - WHERE id <= $1 - AND status <> $2 - "#, - ) - .bind(root_id) - .bind(<&str>::from(mined_status)); - - tx.execute(update_previous_roots).await?; - - tx.commit().await?; - - Ok(()) - } - - pub async fn get_identity_history_entries( - &self, - commitment: &Hash, - ) -> Result, Error> { - let unprocessed = sqlx::query( - r#" - SELECT commitment, status, eligibility - FROM unprocessed_identities - WHERE commitment = $1 - "#, - ) - .bind(commitment); - - let rows = self.pool.fetch_all(unprocessed).await?; - let unprocessed_updates = rows - .into_iter() - .map(|row| { - let eligibility_timestamp: DateTime = row.get(2); - let held_back = Utc::now() < eligibility_timestamp; - - CommitmentHistoryEntry { - leaf_index: None, - commitment: row.get::(0), - held_back, - status: row - .get::<&str, _>(1) - .parse() - .expect("Failed to parse unprocessed status"), - } - }) - .collect::>(); - - let leaf_index = self.get_identity_leaf_index(commitment).await?; - let Some(leaf_index) = leaf_index else { - return Ok(unprocessed_updates); - }; - - let identity_deletions = sqlx::query( - r#" - SELECT commitment - FROM deletions - WHERE leaf_index = $1 - "#, - ) - .bind(leaf_index.leaf_index as i64); - - let rows = self.pool.fetch_all(identity_deletions).await?; - let deletions = rows - .into_iter() - .map(|_row| CommitmentHistoryEntry { - leaf_index: Some(leaf_index.leaf_index), - commitment: Hash::ZERO, - held_back: false, - status: UnprocessedStatus::New.into(), - }) - .collect::>(); - - let processed_updates = sqlx::query( - r#" - SELECT commitment, status - FROM identities - WHERE leaf_index = $1 - ORDER BY id ASC - "#, - ) - .bind(leaf_index.leaf_index as i64); - - let rows = self.pool.fetch_all(processed_updates).await?; - let processed_updates: Vec = rows - .into_iter() - .map(|row| CommitmentHistoryEntry { - leaf_index: Some(leaf_index.leaf_index), - commitment: row.get::(0), - held_back: false, - status: row - .get::<&str, _>(1) - .parse() - .expect("Status is unreadable, database is corrupt"), - }) - .collect(); - - Ok([processed_updates, unprocessed_updates, deletions] - .concat() - .into_iter() - .collect()) - } -} - -/// This trait provides the individual and composable queries to the database. -/// Each method is a single atomic query, and can be composed withing a -/// transaction. -pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { - async fn insert_pending_identity( - self, - leaf_index: usize, - identity: &Hash, - root: &Hash, - ) -> Result<(), Error> { - let insert_pending_identity_query = sqlx::query( - r#" - INSERT INTO identities (leaf_index, commitment, root, status, pending_as_of) - VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP) - "#, - ) - .bind(leaf_index as i64) - .bind(identity) - .bind(root) - .bind(<&str>::from(ProcessedStatus::Pending)); - - self.execute(insert_pending_identity_query).await?; - - Ok(()) - } - - async fn get_id_by_root(self, root: &Hash) -> Result, Error> { - let root_index_query = sqlx::query( - r#" - SELECT id - FROM identities - WHERE root = $1 - ORDER BY id ASC - LIMIT 1 - "#, - ) - .bind(root); - - let row = self.fetch_optional(root_index_query).await?; - - let Some(row) = row else { return Ok(None) }; - let root_id = row.get::(0); - - Ok(Some(root_id as usize)) - } - - /// Marks all the identities in the db as - #[instrument(skip(self), level = "debug")] - async fn mark_all_as_pending(self) -> Result<(), Error> { - let pending_status = ProcessedStatus::Pending; - - let update_all_identities = sqlx::query( - r#" - UPDATE identities - SET status = $1, mined_at = NULL - WHERE status <> $1 - "#, - ) - .bind(<&str>::from(pending_status)); - - self.execute(update_all_identities).await?; - - Ok(()) - } - - async fn get_next_leaf_index(self) -> Result { - let query = sqlx::query( - r#" - SELECT leaf_index FROM identities - ORDER BY leaf_index DESC - LIMIT 1 - "#, - ); - - let row = self.fetch_optional(query).await?; - - let Some(row) = row else { return Ok(0) }; - let leaf_index = row.get::(0); - - Ok((leaf_index + 1) as usize) - } - - async fn get_identity_leaf_index(self, identity: &Hash) -> Result, Error> { - let query = sqlx::query( - r#" - SELECT leaf_index, status - FROM identities - WHERE commitment = $1 - ORDER BY id DESC - LIMIT 1; - "#, - ) - .bind(identity); - - let Some(row) = self.fetch_optional(query).await? else { - return Ok(None); - }; - - let leaf_index = row.get::(0) as usize; - - let status = row - .get::<&str, _>(1) - .parse() - .expect("Status is unreadable, database is corrupt"); - - Ok(Some(TreeItem { status, leaf_index })) - } - - async fn get_commitments_by_status( - self, - status: ProcessedStatus, - ) -> Result, Error> { - Ok(sqlx::query_as::<_, TreeUpdate>( - r#" - SELECT leaf_index, commitment as element - FROM identities - WHERE status = $1 - ORDER BY id ASC; - "#, - ) - .bind(<&str>::from(status)) - .fetch_all(self) - .await?) - } - - async fn get_latest_root_by_status( - self, - status: ProcessedStatus, - ) -> Result, Error> { - Ok(sqlx::query( - r#" - SELECT root FROM identities WHERE status = $1 ORDER BY id DESC LIMIT 1 - "#, - ) - .bind(<&str>::from(status)) - .fetch_optional(self) - .await? - .map(|r| r.get::(0))) - } - - async fn get_root_state(self, root: &Hash) -> Result, Error> { - // This tries really hard to do everything in one query to prevent race - // conditions. - Ok(sqlx::query_as::<_, RootItem>( - r#" - SELECT - root, - status, - pending_as_of as pending_valid_as_of, - mined_at as mined_valid_as_of - FROM identities - WHERE root = $1 - ORDER BY id - LIMIT 1 - "#, - ) - .bind(root) - .fetch_optional(self) - .await?) - } - - async fn get_latest_insertion_timestamp(self) -> Result>, Error> { - let query = sqlx::query( - r#" - SELECT insertion_timestamp - FROM latest_insertion_timestamp - WHERE Lock = 'X';"#, - ); - - let row = self.fetch_optional(query).await?; - - Ok(row.map(|r| r.get::, _>(0))) - } - - async fn count_unprocessed_identities(self) -> Result { - let query = sqlx::query( - r#" - SELECT COUNT(*) as unprocessed - FROM unprocessed_identities - "#, - ); - let result = self.fetch_one(query).await?; - Ok(result.get::(0) as i32) - } - - async fn count_pending_identities(self) -> Result { - let query = sqlx::query( - r#" - SELECT COUNT(*) as pending - FROM identities - WHERE status = $1 - "#, - ) - .bind(<&str>::from(ProcessedStatus::Pending)); - let result = self.fetch_one(query).await?; - Ok(result.get::(0) as i32) - } - - async fn get_provers(self) -> Result, Error> { - Ok(sqlx::query_as( - r#" - SELECT batch_size, url, timeout_s, prover_type - FROM provers - "#, - ) - .fetch_all(self) - .await? - .into_iter() - .collect()) - } - async fn insert_prover_configuration( - self, - batch_size: usize, - url: impl ToString, - timeout_seconds: u64, - prover_type: ProverType, - ) -> Result<(), Error> { - let url = url.to_string(); - - let query = sqlx::query( - r#" - INSERT INTO provers (batch_size, url, timeout_s, prover_type) - VALUES ($1, $2, $3, $4) - "#, - ) - .bind(batch_size as i64) - .bind(url) - .bind(timeout_seconds as i64) - .bind(prover_type); - - self.execute(query).await?; - - Ok(()) - } - - async fn insert_provers(self, provers: HashSet) -> Result<(), Error> { - if provers.is_empty() { - return Ok(()); - } - - let mut query_builder = sqlx::QueryBuilder::new( - r#" - INSERT INTO provers (batch_size, url, timeout_s, prover_type) - "#, - ); - - query_builder.push_values(provers, |mut b, prover| { - b.push_bind(prover.batch_size as i64) - .push_bind(prover.url) - .push_bind(prover.timeout_s as i64) - .push_bind(prover.prover_type); - }); - - let query = query_builder.build(); - - self.execute(query).await?; - Ok(()) - } - - async fn remove_prover(self, batch_size: usize, prover_type: ProverType) -> Result<(), Error> { - let query = sqlx::query( - r#" - DELETE FROM provers WHERE batch_size = $1 AND prover_type = $2 - "#, - ) - .bind(batch_size as i64) - .bind(prover_type); - - self.execute(query).await?; - - Ok(()) - } - - async fn insert_new_identity( - self, - identity: Hash, - eligibility_timestamp: sqlx::types::chrono::DateTime, - ) -> Result { - let query = sqlx::query( - r#" - INSERT INTO unprocessed_identities (commitment, status, created_at, eligibility) - VALUES ($1, $2, CURRENT_TIMESTAMP, $3) - "#, - ) - .bind(identity) - .bind(<&str>::from(UnprocessedStatus::New)) - .bind(eligibility_timestamp); - - self.execute(query).await?; - Ok(identity) - } - - async fn insert_new_recovery( - self, - existing_commitment: &Hash, - new_commitment: &Hash, - ) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO recoveries (existing_commitment, new_commitment) - VALUES ($1, $2) - "#, - ) - .bind(existing_commitment) - .bind(new_commitment); - self.execute(query).await?; - Ok(()) - } - - async fn get_latest_deletion(self) -> Result { - let query = - sqlx::query("SELECT deletion_timestamp FROM latest_deletion_root WHERE Lock = 'X';"); - - let row = self.fetch_optional(query).await?; - - if let Some(row) = row { - Ok(LatestDeletionEntry { - timestamp: row.get(0), - }) - } else { - Ok(LatestDeletionEntry { - timestamp: Utc::now(), - }) - } - } - - async fn update_latest_insertion_timestamp( - self, - insertion_timestamp: DateTime, - ) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO latest_insertion_timestamp (Lock, insertion_timestamp) - VALUES ('X', $1) - ON CONFLICT (Lock) - DO UPDATE SET insertion_timestamp = EXCLUDED.insertion_timestamp; - "#, - ) - .bind(insertion_timestamp); - - self.execute(query).await?; - Ok(()) - } - - async fn update_latest_deletion(self, deletion_timestamp: DateTime) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO latest_deletion_root (Lock, deletion_timestamp) - VALUES ('X', $1) - ON CONFLICT (Lock) - DO UPDATE SET deletion_timestamp = EXCLUDED.deletion_timestamp; - "#, - ) - .bind(deletion_timestamp); - - self.execute(query).await?; - Ok(()) - } - - async fn get_all_recoveries(self) -> Result, Error> { - Ok( - sqlx::query_as::<_, RecoveryEntry>("SELECT * FROM recoveries") - .fetch_all(self) - .await?, - ) - } - - async fn delete_recoveries, T: Into>( - self, - prev_commits: I, - ) -> Result, Error> { - // TODO: upstream PgHasArrayType impl to ruint - let prev_commits = prev_commits - .into_iter() - .map(|c| c.into().to_be_bytes()) - .collect::>(); - - let res = sqlx::query_as::<_, RecoveryEntry>( - r#" - DELETE - FROM recoveries - WHERE existing_commitment = ANY($1) - RETURNING * - "#, - ) - .bind(&prev_commits) - .fetch_all(self) - .await?; - - Ok(res) - } - - async fn insert_new_deletion(self, leaf_index: usize, identity: &Hash) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO deletions (leaf_index, commitment) - VALUES ($1, $2) - "#, - ) - .bind(leaf_index as i64) - .bind(identity); - - self.execute(query).await?; - Ok(()) - } - - // TODO: consider using a larger value than i64 for leaf index, ruint should - // have postgres compatibility for u256 - async fn get_deletions(self) -> Result, Error> { - let query = sqlx::query( - r#" - SELECT * - FROM deletions - "#, - ); - - let result = self.fetch_all(query).await?; - - Ok(result - .into_iter() - .map(|row| DeletionEntry { - leaf_index: row.get::(0) as usize, - commitment: row.get::(1), - }) - .collect::>()) - } - - /// Remove a list of entries from the deletions table - async fn remove_deletions(self, commitments: &[Hash]) -> Result<(), Error> { - let commitments = commitments - .iter() - .map(|c| c.to_be_bytes()) - .collect::>(); - - sqlx::query("DELETE FROM deletions WHERE commitment = Any($1)") - .bind(commitments) - .execute(self) - .await?; - - Ok(()) - } - - async fn get_eligible_unprocessed_commitments( - self, - status: UnprocessedStatus, - ) -> Result, Error> { - let query = sqlx::query( - r#" - SELECT * FROM unprocessed_identities - WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility - LIMIT $2 - "#, - ) - .bind(<&str>::from(status)) - .bind(MAX_UNPROCESSED_FETCH_COUNT); - - let result = self.fetch_all(query).await?; - - Ok(result - .into_iter() - .map(|row| types::UnprocessedCommitment { - commitment: row.get::(0), - status, - created_at: row.get::<_, _>(2), - processed_at: row.get::<_, _>(3), - error_message: row.get::<_, _>(4), - eligibility_timestamp: row.get::<_, _>(5), - }) - .collect::>()) - } - - async fn get_unprocessed_commit_status( - self, - commitment: &Hash, - ) -> Result, Error> { - let query = sqlx::query( - r#" - SELECT status, error_message FROM unprocessed_identities WHERE commitment = $1 - "#, - ) - .bind(commitment); - - let result = self.fetch_optional(query).await?; - - if let Some(row) = result { - return Ok(Some(( - row.get::<&str, _>(0).parse().expect("couldn't read status"), - row.get::, _>(1).unwrap_or_default(), - ))); - }; - Ok(None) - } - - async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { - let query = sqlx::query( - r#" - DELETE FROM unprocessed_identities WHERE commitment = $1 - "#, - ) - .bind(commitment); - - self.execute(query).await?; - - Ok(()) - } - - async fn identity_exists(self, commitment: Hash) -> Result { - Ok(sqlx::query( - r#" - select - EXISTS (select commitment from unprocessed_identities where commitment = $1) OR - EXISTS (select commitment from identities where commitment = $1); - "#, - ) - .bind(commitment) - .fetch_one(self) - .await? - .get::(0)) - } - - // TODO: add docs - async fn identity_is_queued_for_deletion(self, commitment: &Hash) -> Result { - let query_queued_deletion = - sqlx::query(r#"SELECT exists(SELECT 1 FROM deletions where commitment = $1)"#) - .bind(commitment); - let row_unprocessed = self.fetch_one(query_queued_deletion).await?; - Ok(row_unprocessed.get::(0)) - } } #[derive(Debug, Error)] @@ -867,7 +163,7 @@ mod test { use super::Database; use crate::config::DatabaseConfig; - use crate::database::DatabaseExt as _; + use crate::database::query::DatabaseQuery; use crate::identity_tree::{Hash, ProcessedStatus, Status, UnprocessedStatus}; use crate::prover::{ProverConfig, ProverType}; use crate::utils::secret::SecretUrl; @@ -1369,7 +665,7 @@ mod test { .context("Inserting identity")?; } - db.mark_root_as_processed(&roots[2]).await?; + db.mark_root_as_processed_tx(&roots[2]).await?; db.mark_all_as_pending().await?; @@ -1399,7 +695,7 @@ mod test { .context("Inserting identity")?; } - db.mark_root_as_processed(&roots[2]).await?; + db.mark_root_as_processed_tx(&roots[2]).await?; for root in roots.iter().take(3) { let root = db @@ -1442,7 +738,7 @@ mod test { .context("Inserting identity")?; } - db.mark_root_as_mined(&roots[2]).await?; + db.mark_root_as_mined_tx(&roots[2]).await?; for root in roots.iter().take(3) { let root = db @@ -1488,27 +784,27 @@ mod test { } println!("Marking roots up to 2nd as processed"); - db.mark_root_as_processed(&roots[2]).await?; + db.mark_root_as_processed_tx(&roots[2]).await?; assert_roots_are(&db, &roots[..3], ProcessedStatus::Processed).await?; assert_roots_are(&db, &roots[3..], ProcessedStatus::Pending).await?; println!("Marking roots up to 1st as mined"); - db.mark_root_as_mined(&roots[1]).await?; + db.mark_root_as_mined_tx(&roots[1]).await?; assert_roots_are(&db, &roots[..2], ProcessedStatus::Mined).await?; assert_roots_are(&db, &[roots[2]], ProcessedStatus::Processed).await?; assert_roots_are(&db, &roots[3..], ProcessedStatus::Pending).await?; println!("Marking roots up to 4th as processed"); - db.mark_root_as_processed(&roots[4]).await?; + db.mark_root_as_processed_tx(&roots[4]).await?; assert_roots_are(&db, &roots[..2], ProcessedStatus::Mined).await?; assert_roots_are(&db, &roots[2..5], ProcessedStatus::Processed).await?; assert_roots_are(&db, &roots[5..], ProcessedStatus::Pending).await?; println!("Marking all roots as mined"); - db.mark_root_as_mined(&roots[num_identities - 1]).await?; + db.mark_root_as_mined_tx(&roots[num_identities - 1]).await?; assert_roots_are(&db, &roots, ProcessedStatus::Mined).await?; @@ -1530,10 +826,10 @@ mod test { } // root[2] is somehow erroneously marked as mined - db.mark_root_as_processed(&roots[2]).await?; + db.mark_root_as_processed_tx(&roots[2]).await?; // Later we correctly mark the previous root as mined - db.mark_root_as_processed(&roots[1]).await?; + db.mark_root_as_processed_tx(&roots[1]).await?; for root in roots.iter().take(2) { let root = db @@ -1591,7 +887,7 @@ mod test { "Root has not yet been mined" ); - db.mark_root_as_processed(&roots[0]).await?; + db.mark_root_as_processed_tx(&roots[0]).await?; let root = db .get_root_state(&roots[0]) @@ -1632,7 +928,7 @@ mod test { .context("Inserting identity")?; } - db.mark_root_as_processed(&roots[2]).await?; + db.mark_root_as_processed_tx(&roots[2]).await?; let mined_tree_updates = db .get_commitments_by_status(ProcessedStatus::Processed) @@ -1761,7 +1057,7 @@ mod test { db.insert_pending_identity(3, &identities[3], &roots[3]) .await?; - db.mark_root_as_processed(&roots[0]) + db.mark_root_as_processed_tx(&roots[0]) .await .context("Marking root as mined")?; @@ -1925,7 +1221,7 @@ mod test { db.insert_pending_identity(0, &identities[0], &roots[0]) .await?; - db.mark_root_as_mined(&roots[0]).await?; + db.mark_root_as_mined_tx(&roots[0]).await?; db.insert_new_deletion(0, &identities[0]).await?; @@ -1961,7 +1257,7 @@ mod test { db.insert_pending_identity(0, &Hash::ZERO, &roots[1]) .await?; - db.mark_root_as_mined(&roots[1]).await?; + db.mark_root_as_mined_tx(&roots[1]).await?; let history = db.get_identity_history_entries(&identities[0]).await?; @@ -2002,7 +1298,7 @@ mod test { assert_eq!(history[0].leaf_index, Some(0)); assert!(!history[0].held_back, "Identity should not be held back"); - db.mark_root_as_mined(&roots[0]).await?; + db.mark_root_as_mined_tx(&roots[0]).await?; let history = db.get_identity_history_entries(&identities[0]).await?; diff --git a/src/database/query.rs b/src/database/query.rs new file mode 100644 index 00000000..80fec59c --- /dev/null +++ b/src/database/query.rs @@ -0,0 +1,545 @@ +use std::collections::HashSet; + +use chrono::{DateTime, Utc}; +use ruint::aliases::U256; +use sqlx::{Executor, Postgres, Row}; +use tracing::instrument; +use types::{DeletionEntry, LatestDeletionEntry, RecoveryEntry}; + +use crate::database::{types, Error}; +use crate::identity_tree::{ + Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, +}; +use crate::prover::{ProverConfig, ProverType}; + +const MAX_UNPROCESSED_FETCH_COUNT: i64 = 10_000; + +/// This trait provides the individual and composable queries to the database. +/// Each method is a single atomic query, and can be composed within a +/// transaction. +pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { + async fn insert_pending_identity( + self, + leaf_index: usize, + identity: &Hash, + root: &Hash, + ) -> Result<(), Error> { + let insert_pending_identity_query = sqlx::query( + r#" + INSERT INTO identities (leaf_index, commitment, root, status, pending_as_of) + VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP) + "#, + ) + .bind(leaf_index as i64) + .bind(identity) + .bind(root) + .bind(<&str>::from(ProcessedStatus::Pending)); + + self.execute(insert_pending_identity_query).await?; + + Ok(()) + } + + async fn get_id_by_root(self, root: &Hash) -> Result, Error> { + let root_index_query = sqlx::query( + r#" + SELECT id + FROM identities + WHERE root = $1 + ORDER BY id ASC + LIMIT 1 + "#, + ) + .bind(root); + + let row = self.fetch_optional(root_index_query).await?; + + let Some(row) = row else { return Ok(None) }; + let root_id = row.get::(0); + + Ok(Some(root_id as usize)) + } + + /// Marks all the identities in the db as + #[instrument(skip(self), level = "debug")] + async fn mark_all_as_pending(self) -> Result<(), Error> { + let pending_status = ProcessedStatus::Pending; + + let update_all_identities = sqlx::query( + r#" + UPDATE identities + SET status = $1, mined_at = NULL + WHERE status <> $1 + "#, + ) + .bind(<&str>::from(pending_status)); + + self.execute(update_all_identities).await?; + + Ok(()) + } + + async fn get_next_leaf_index(self) -> Result { + let query = sqlx::query( + r#" + SELECT leaf_index FROM identities + ORDER BY leaf_index DESC + LIMIT 1 + "#, + ); + + let row = self.fetch_optional(query).await?; + + let Some(row) = row else { return Ok(0) }; + let leaf_index = row.get::(0); + + Ok((leaf_index + 1) as usize) + } + + async fn get_identity_leaf_index(self, identity: &Hash) -> Result, Error> { + let query = sqlx::query( + r#" + SELECT leaf_index, status + FROM identities + WHERE commitment = $1 + ORDER BY id DESC + LIMIT 1; + "#, + ) + .bind(identity); + + let Some(row) = self.fetch_optional(query).await? else { + return Ok(None); + }; + + let leaf_index = row.get::(0) as usize; + + let status = row + .get::<&str, _>(1) + .parse() + .expect("Status is unreadable, database is corrupt"); + + Ok(Some(TreeItem { status, leaf_index })) + } + + async fn get_commitments_by_status( + self, + status: ProcessedStatus, + ) -> Result, Error> { + Ok(sqlx::query_as::<_, TreeUpdate>( + r#" + SELECT leaf_index, commitment as element + FROM identities + WHERE status = $1 + ORDER BY id ASC; + "#, + ) + .bind(<&str>::from(status)) + .fetch_all(self) + .await?) + } + + async fn get_latest_root_by_status( + self, + status: ProcessedStatus, + ) -> Result, Error> { + Ok(sqlx::query( + r#" + SELECT root FROM identities WHERE status = $1 ORDER BY id DESC LIMIT 1 + "#, + ) + .bind(<&str>::from(status)) + .fetch_optional(self) + .await? + .map(|r| r.get::(0))) + } + + async fn get_root_state(self, root: &Hash) -> Result, Error> { + // This tries really hard to do everything in one query to prevent race + // conditions. + Ok(sqlx::query_as::<_, RootItem>( + r#" + SELECT + root, + status, + pending_as_of as pending_valid_as_of, + mined_at as mined_valid_as_of + FROM identities + WHERE root = $1 + ORDER BY id + LIMIT 1 + "#, + ) + .bind(root) + .fetch_optional(self) + .await?) + } + + async fn get_latest_insertion_timestamp(self) -> Result>, Error> { + let query = sqlx::query( + r#" + SELECT insertion_timestamp + FROM latest_insertion_timestamp + WHERE Lock = 'X';"#, + ); + + let row = self.fetch_optional(query).await?; + + Ok(row.map(|r| r.get::, _>(0))) + } + + async fn count_unprocessed_identities(self) -> Result { + let query = sqlx::query( + r#" + SELECT COUNT(*) as unprocessed + FROM unprocessed_identities + "#, + ); + let result = self.fetch_one(query).await?; + Ok(result.get::(0) as i32) + } + + async fn count_pending_identities(self) -> Result { + let query = sqlx::query( + r#" + SELECT COUNT(*) as pending + FROM identities + WHERE status = $1 + "#, + ) + .bind(<&str>::from(ProcessedStatus::Pending)); + let result = self.fetch_one(query).await?; + Ok(result.get::(0) as i32) + } + + async fn get_provers(self) -> Result, Error> { + Ok(sqlx::query_as( + r#" + SELECT batch_size, url, timeout_s, prover_type + FROM provers + "#, + ) + .fetch_all(self) + .await? + .into_iter() + .collect()) + } + async fn insert_prover_configuration( + self, + batch_size: usize, + url: impl ToString, + timeout_seconds: u64, + prover_type: ProverType, + ) -> Result<(), Error> { + let url = url.to_string(); + + let query = sqlx::query( + r#" + INSERT INTO provers (batch_size, url, timeout_s, prover_type) + VALUES ($1, $2, $3, $4) + "#, + ) + .bind(batch_size as i64) + .bind(url) + .bind(timeout_seconds as i64) + .bind(prover_type); + + self.execute(query).await?; + + Ok(()) + } + + async fn insert_provers(self, provers: HashSet) -> Result<(), Error> { + if provers.is_empty() { + return Ok(()); + } + + let mut query_builder = sqlx::QueryBuilder::new( + r#" + INSERT INTO provers (batch_size, url, timeout_s, prover_type) + "#, + ); + + query_builder.push_values(provers, |mut b, prover| { + b.push_bind(prover.batch_size as i64) + .push_bind(prover.url) + .push_bind(prover.timeout_s as i64) + .push_bind(prover.prover_type); + }); + + let query = query_builder.build(); + + self.execute(query).await?; + Ok(()) + } + + async fn remove_prover(self, batch_size: usize, prover_type: ProverType) -> Result<(), Error> { + let query = sqlx::query( + r#" + DELETE FROM provers WHERE batch_size = $1 AND prover_type = $2 + "#, + ) + .bind(batch_size as i64) + .bind(prover_type); + + self.execute(query).await?; + + Ok(()) + } + + async fn insert_new_identity( + self, + identity: Hash, + eligibility_timestamp: sqlx::types::chrono::DateTime, + ) -> Result { + let query = sqlx::query( + r#" + INSERT INTO unprocessed_identities (commitment, status, created_at, eligibility) + VALUES ($1, $2, CURRENT_TIMESTAMP, $3) + "#, + ) + .bind(identity) + .bind(<&str>::from(UnprocessedStatus::New)) + .bind(eligibility_timestamp); + + self.execute(query).await?; + Ok(identity) + } + + async fn insert_new_recovery( + self, + existing_commitment: &Hash, + new_commitment: &Hash, + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO recoveries (existing_commitment, new_commitment) + VALUES ($1, $2) + "#, + ) + .bind(existing_commitment) + .bind(new_commitment); + self.execute(query).await?; + Ok(()) + } + + async fn get_latest_deletion(self) -> Result { + let query = + sqlx::query("SELECT deletion_timestamp FROM latest_deletion_root WHERE Lock = 'X';"); + + let row = self.fetch_optional(query).await?; + + if let Some(row) = row { + Ok(LatestDeletionEntry { + timestamp: row.get(0), + }) + } else { + Ok(LatestDeletionEntry { + timestamp: Utc::now(), + }) + } + } + + async fn update_latest_insertion_timestamp( + self, + insertion_timestamp: DateTime, + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO latest_insertion_timestamp (Lock, insertion_timestamp) + VALUES ('X', $1) + ON CONFLICT (Lock) + DO UPDATE SET insertion_timestamp = EXCLUDED.insertion_timestamp; + "#, + ) + .bind(insertion_timestamp); + + self.execute(query).await?; + Ok(()) + } + + async fn update_latest_deletion(self, deletion_timestamp: DateTime) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO latest_deletion_root (Lock, deletion_timestamp) + VALUES ('X', $1) + ON CONFLICT (Lock) + DO UPDATE SET deletion_timestamp = EXCLUDED.deletion_timestamp; + "#, + ) + .bind(deletion_timestamp); + + self.execute(query).await?; + Ok(()) + } + + async fn get_all_recoveries(self) -> Result, Error> { + Ok( + sqlx::query_as::<_, RecoveryEntry>("SELECT * FROM recoveries") + .fetch_all(self) + .await?, + ) + } + + async fn delete_recoveries, T: Into>( + self, + prev_commits: I, + ) -> Result, Error> { + // TODO: upstream PgHasArrayType impl to ruint + let prev_commits = prev_commits + .into_iter() + .map(|c| c.into().to_be_bytes()) + .collect::>(); + + let res = sqlx::query_as::<_, RecoveryEntry>( + r#" + DELETE + FROM recoveries + WHERE existing_commitment = ANY($1) + RETURNING * + "#, + ) + .bind(&prev_commits) + .fetch_all(self) + .await?; + + Ok(res) + } + + async fn insert_new_deletion(self, leaf_index: usize, identity: &Hash) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO deletions (leaf_index, commitment) + VALUES ($1, $2) + "#, + ) + .bind(leaf_index as i64) + .bind(identity); + + self.execute(query).await?; + Ok(()) + } + + // TODO: consider using a larger value than i64 for leaf index, ruint should + // have postgres compatibility for u256 + async fn get_deletions(self) -> Result, Error> { + let query = sqlx::query( + r#" + SELECT * + FROM deletions + "#, + ); + + let result = self.fetch_all(query).await?; + + Ok(result + .into_iter() + .map(|row| DeletionEntry { + leaf_index: row.get::(0) as usize, + commitment: row.get::(1), + }) + .collect::>()) + } + + /// Remove a list of entries from the deletions table + async fn remove_deletions(self, commitments: &[Hash]) -> Result<(), Error> { + let commitments = commitments + .iter() + .map(|c| c.to_be_bytes()) + .collect::>(); + + sqlx::query("DELETE FROM deletions WHERE commitment = Any($1)") + .bind(commitments) + .execute(self) + .await?; + + Ok(()) + } + + async fn get_eligible_unprocessed_commitments( + self, + status: UnprocessedStatus, + ) -> Result, Error> { + let query = sqlx::query( + r#" + SELECT * FROM unprocessed_identities + WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility + LIMIT $2 + "#, + ) + .bind(<&str>::from(status)) + .bind(MAX_UNPROCESSED_FETCH_COUNT); + + let result = self.fetch_all(query).await?; + + Ok(result + .into_iter() + .map(|row| types::UnprocessedCommitment { + commitment: row.get::(0), + status, + created_at: row.get::<_, _>(2), + processed_at: row.get::<_, _>(3), + error_message: row.get::<_, _>(4), + eligibility_timestamp: row.get::<_, _>(5), + }) + .collect::>()) + } + + async fn get_unprocessed_commit_status( + self, + commitment: &Hash, + ) -> Result, Error> { + let query = sqlx::query( + r#" + SELECT status, error_message FROM unprocessed_identities WHERE commitment = $1 + "#, + ) + .bind(commitment); + + let result = self.fetch_optional(query).await?; + + if let Some(row) = result { + return Ok(Some(( + row.get::<&str, _>(0).parse().expect("couldn't read status"), + row.get::, _>(1).unwrap_or_default(), + ))); + }; + Ok(None) + } + + async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { + let query = sqlx::query( + r#" + DELETE FROM unprocessed_identities WHERE commitment = $1 + "#, + ) + .bind(commitment); + + self.execute(query).await?; + + Ok(()) + } + + async fn identity_exists(self, commitment: Hash) -> Result { + Ok(sqlx::query( + r#" + select + EXISTS (select commitment from unprocessed_identities where commitment = $1) OR + EXISTS (select commitment from identities where commitment = $1); + "#, + ) + .bind(commitment) + .fetch_one(self) + .await? + .get::(0)) + } + + // TODO: add docs + async fn identity_is_queued_for_deletion(self, commitment: &Hash) -> Result { + let query_queued_deletion = + sqlx::query(r#"SELECT exists(SELECT 1 FROM deletions where commitment = $1)"#) + .bind(commitment); + let row_unprocessed = self.fetch_one(query_queued_deletion).await?; + Ok(row_unprocessed.get::(0)) + } +} diff --git a/src/database/transaction.rs b/src/database/transaction.rs new file mode 100644 index 00000000..ebd39756 --- /dev/null +++ b/src/database/transaction.rs @@ -0,0 +1,176 @@ +use chrono::{DateTime, Utc}; +use sqlx::{Executor, Row}; +use tracing::instrument; + +use crate::database::query::DatabaseQuery; +use crate::database::types::CommitmentHistoryEntry; +use crate::database::{Database, Error}; +use crate::identity_tree::{Hash, ProcessedStatus, UnprocessedStatus}; +use crate::utils::retry_tx; + +/// impl block for database transactions +impl Database { + /// Marks the identities and roots from before a given root hash as mined + /// Also marks following roots as pending + #[instrument(skip(self), level = "debug")] + pub async fn mark_root_as_processed_tx(&self, root: &Hash) -> Result<(), Error> { + retry_tx!(self.pool, tx, { + let root_id = tx.get_id_by_root(root).await?; + + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; + + let root_id = root_id as i64; + // TODO: Can I get rid of line `AND status <> $2 + let update_previous_roots = sqlx::query( + r#" + UPDATE identities + SET status = $2, mined_at = CURRENT_TIMESTAMP + WHERE id <= $1 + AND status <> $2 + AND status <> $3; + "#, + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Processed)) + .bind(<&str>::from(ProcessedStatus::Mined)); + + let update_next_roots = sqlx::query( + r#" + UPDATE identities + SET status = $2, mined_at = NULL + WHERE id > $1 + "#, + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Pending)); + + tx.execute(update_previous_roots).await?; + tx.execute(update_next_roots).await?; + + Ok(()) + }) + .await + } + + /// Marks the identities and roots from before a given root hash as + /// finalized + #[instrument(skip(self), level = "debug")] + pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> { + let mined_status = ProcessedStatus::Mined; + + retry_tx!(self.pool, tx, { + let root_id = tx.get_id_by_root(root).await?; + + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; + + let root_id = root_id as i64; + + let update_previous_roots = sqlx::query( + r#" + UPDATE identities + SET status = $2 + WHERE id <= $1 + AND status <> $2 + "#, + ) + .bind(root_id) + .bind(<&str>::from(mined_status)); + + tx.execute(update_previous_roots).await?; + + Ok(()) + }) + .await + } + + pub async fn get_identity_history_entries( + &self, + commitment: &Hash, + ) -> Result, Error> { + let unprocessed = sqlx::query( + r#" + SELECT commitment, status, eligibility + FROM unprocessed_identities + WHERE commitment = $1 + "#, + ) + .bind(commitment); + + let rows = self.pool.fetch_all(unprocessed).await?; + let unprocessed_updates = rows + .into_iter() + .map(|row| { + let eligibility_timestamp: DateTime = row.get(2); + let held_back = Utc::now() < eligibility_timestamp; + + CommitmentHistoryEntry { + leaf_index: None, + commitment: row.get::(0), + held_back, + status: row + .get::<&str, _>(1) + .parse() + .expect("Failed to parse unprocessed status"), + } + }) + .collect::>(); + + let leaf_index = self.get_identity_leaf_index(commitment).await?; + let Some(leaf_index) = leaf_index else { + return Ok(unprocessed_updates); + }; + + let identity_deletions = sqlx::query( + r#" + SELECT commitment + FROM deletions + WHERE leaf_index = $1 + "#, + ) + .bind(leaf_index.leaf_index as i64); + + let rows = self.pool.fetch_all(identity_deletions).await?; + let deletions = rows + .into_iter() + .map(|_row| CommitmentHistoryEntry { + leaf_index: Some(leaf_index.leaf_index), + commitment: Hash::ZERO, + held_back: false, + status: UnprocessedStatus::New.into(), + }) + .collect::>(); + + let processed_updates = sqlx::query( + r#" + SELECT commitment, status + FROM identities + WHERE leaf_index = $1 + ORDER BY id ASC + "#, + ) + .bind(leaf_index.leaf_index as i64); + + let rows = self.pool.fetch_all(processed_updates).await?; + let processed_updates: Vec = rows + .into_iter() + .map(|row| CommitmentHistoryEntry { + leaf_index: Some(leaf_index.leaf_index), + commitment: row.get::(0), + held_back: false, + status: row + .get::<&str, _>(1) + .parse() + .expect("Status is unreadable, database is corrupt"), + }) + .collect(); + + Ok([processed_updates, unprocessed_updates, deletions] + .concat() + .into_iter() + .collect()) + } +} diff --git a/src/ethereum/mod.rs b/src/ethereum/mod.rs index e960e5b7..8c4d5871 100644 --- a/src/ethereum/mod.rs +++ b/src/ethereum/mod.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use ethers::providers::Middleware; use ethers::types::transaction::eip2718::TypedTransaction; use ethers::types::Address; pub use read::ReadProvider; diff --git a/src/server.rs b/src/server.rs index 473517d1..2ab9f38e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -87,7 +87,7 @@ async fn delete_identity( State(app): State>, Json(req): Json, ) -> Result<(), Error> { - app.delete_identity(&req.identity_commitment).await?; + app.delete_identity_tx(&req.identity_commitment).await?; Ok(()) } diff --git a/src/task_monitor.rs b/src/task_monitor.rs index dfee3ea7..aac1b1ae 100644 --- a/src/task_monitor.rs +++ b/src/task_monitor.rs @@ -8,7 +8,8 @@ use tokio::task::JoinHandle; use tracing::{info, instrument, warn}; use crate::app::App; -use crate::database::{Database, DatabaseExt as _}; +use crate::database::query::DatabaseQuery as _; +use crate::database::Database; pub mod tasks; diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index 45e9ac49..d1581fe1 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -7,8 +7,8 @@ use tokio::sync::{Mutex, Notify}; use tracing::info; use crate::app::App; +use crate::database::query::DatabaseQuery; use crate::database::types::DeletionEntry; -use crate::database::DatabaseExt; use crate::identity_tree::Hash; pub async fn delete_identities( diff --git a/src/task_monitor/tasks/finalize_identities.rs b/src/task_monitor/tasks/finalize_identities.rs index 5a43f50d..4750bc9e 100644 --- a/src/task_monitor/tasks/finalize_identities.rs +++ b/src/task_monitor/tasks/finalize_identities.rs @@ -14,8 +14,10 @@ use crate::app::App; use crate::contracts::abi::{BridgedWorldId, RootAddedFilter, TreeChangeKind, TreeChangedFilter}; use crate::contracts::scanner::BlockScanner; use crate::contracts::IdentityManager; -use crate::database::{Database, DatabaseExt as _}; +use crate::database::query::DatabaseQuery as _; +use crate::database::Database; use crate::identity_tree::{Canonical, Intermediate, TreeVersion, TreeWithNextVersion}; +use crate::utils::retry_tx; pub async fn finalize_roots(app: Arc) -> anyhow::Result<()> { let mainnet_abi = app.identity_manager.abi(); @@ -137,7 +139,9 @@ async fn finalize_mainnet_roots( continue; } - database.mark_root_as_processed(&post_root.into()).await?; + database + .mark_root_as_processed_tx(&post_root.into()) + .await?; info!(?pre_root, ?post_root, ?kind, "Batch mined"); @@ -178,7 +182,7 @@ async fn finalize_secondary_roots( continue; } - database.mark_root_as_mined(&root.into()).await?; + database.mark_root_as_mined_tx(&root.into()).await?; finalized_tree.apply_updates_up_to(root.into()); info!(?root, "Root finalized"); @@ -249,46 +253,45 @@ async fn update_eligible_recoveries( log: &Log, max_epoch_duration: Duration, ) -> anyhow::Result<()> { - let tx_hash = log.transaction_hash.context("Missing tx hash")?; - let commitments = identity_manager - .fetch_deletion_indices_from_tx(tx_hash) - .await - .context("Could not fetch deletion indices from tx")?; - - let commitments = processed_tree.commitments_by_indices(commitments.iter().copied()); - let commitments: Vec = commitments - .into_iter() - .map(std::convert::Into::into) - .collect(); - - // Fetch the root history expiry time on chain - let root_history_expiry = identity_manager.root_history_expiry().await?; - - // Use the root history expiry to calcuate the eligibility timestamp for the new - // insertion - let root_history_expiry_duration = - chrono::Duration::seconds(root_history_expiry.as_u64() as i64); - let max_epoch_duration = chrono::Duration::from_std(max_epoch_duration)?; - - let delay = root_history_expiry_duration + max_epoch_duration; - - let eligibility_timestamp = Utc::now() + delay; - - let mut tx = database.begin().await?; - - // Check if any deleted commitments correspond with entries in the - // recoveries table and insert the new commitment into the unprocessed - // identities table with the proper eligibility timestamp - let deleted_recoveries = tx.delete_recoveries(commitments).await?; - - // For each deletion, if there is a corresponding recovery, insert a new - // identity with the specified eligibility timestamp - for recovery in deleted_recoveries { - tx.insert_new_identity(recovery.new_commitment, eligibility_timestamp) - .await?; - } - - tx.commit().await?; + retry_tx!(database.pool, tx, { + let tx_hash = log.transaction_hash.context("Missing tx hash")?; + let commitments = identity_manager + .fetch_deletion_indices_from_tx(tx_hash) + .await + .context("Could not fetch deletion indices from tx")?; + + let commitments = processed_tree.commitments_by_indices(commitments.iter().copied()); + let commitments: Vec = commitments + .into_iter() + .map(std::convert::Into::into) + .collect(); + + // Fetch the root history expiry time on chain + let root_history_expiry = identity_manager.root_history_expiry().await?; + + // Use the root history expiry to calcuate the eligibility timestamp for the new + // insertion + let root_history_expiry_duration = + chrono::Duration::seconds(root_history_expiry.as_u64() as i64); + let max_epoch_duration = chrono::Duration::from_std(max_epoch_duration)?; + + let delay = root_history_expiry_duration + max_epoch_duration; + + let eligibility_timestamp = Utc::now() + delay; + + // Check if any deleted commitments correspond with entries in the + // recoveries table and insert the new commitment into the unprocessed + // identities table with the proper eligibility timestamp + let deleted_recoveries = tx.delete_recoveries(commitments).await?; + + // For each deletion, if there is a corresponding recovery, insert a new + // identity with the specified eligibility timestamp + for recovery in deleted_recoveries { + tx.insert_new_identity(recovery.new_commitment, eligibility_timestamp) + .await?; + } - Ok(()) + Ok(()) + }) + .await } diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index 94ff0fb3..f08ae0b0 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -6,8 +6,9 @@ use tokio::time::sleep; use tracing::instrument; use crate::app::App; +use crate::database::query::DatabaseQuery as _; use crate::database::types::UnprocessedCommitment; -use crate::database::{Database, DatabaseExt}; +use crate::database::Database; use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; pub async fn insert_identities( diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index 6c5c8bcb..01c1aec2 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -12,7 +12,7 @@ use tracing::instrument; use crate::app::App; use crate::contracts::IdentityManager; -use crate::database::DatabaseExt as _; +use crate::database::query::DatabaseQuery as _; use crate::ethereum::write::TransactionId; use crate::identity_tree::{ AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, diff --git a/src/utils.rs b/src/utils.rs index 57cf381f..a292481e 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -8,7 +8,6 @@ use tokio::task::JoinHandle; use tracing::{error, info}; use crate::shutdown::is_shutting_down; - pub mod batch_type; pub mod index_packing; pub mod min_map; @@ -16,6 +15,53 @@ pub mod secret; pub mod serde_utils; pub mod tree_updates; +pub const TX_RETRY_LIMIT: u32 = 10; + +/// Retries a transaction a certain number of times +/// Only errors originating from `Transaction::commit` are retried +/// Errors originating from the transaction function `$expression` are not +/// retried and are instead immediately rolled back. +/// +/// # Example +/// ```ignore +/// let res = retry_tx!(db, tx, { +/// tx.execute("SELECT * FROM table").await?; +/// Ok(tx.execute("SELECT * FROM other").await?) +/// }).await; +macro_rules! retry_tx { + ($pool:expr, $tx:ident, $expression:expr) => { + async { + let mut res; + let mut counter = 0; + loop { + let mut $tx = $pool.begin().await?; + res = async { $expression }.await; + if res.is_err() { + $tx.rollback().await?; + return res; + } + match $tx.commit().await { + Err(e) => { + counter += 1; + let limit = crate::utils::TX_RETRY_LIMIT; + if counter > limit { + return Err(e.into()); + } else { + tracing::warn!( + "db transaction commit failed ({counter}/{limit}): {:?}", + e + ); + } + } + Ok(_) => break, + } + } + res + } + }; +} +pub(crate) use retry_tx; + pub fn spawn_monitored_with_backoff( future_spawner: S, shutdown_sender: broadcast::Sender<()>,