diff --git a/src/database/transaction.rs b/src/database/transaction.rs index 222bdc43..10109429 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -1,4 +1,4 @@ -use sqlx::Executor; +use sqlx::{Executor, Postgres, Transaction}; use tracing::instrument; use crate::database::query::DatabaseQuery; @@ -6,82 +6,105 @@ use crate::database::{Database, Error}; use crate::identity_tree::{Hash, ProcessedStatus}; use crate::utils::retry_tx; -/// impl block for database transactions -impl Database { - /// Marks the identities and roots from before a given root hash as mined - /// Also marks following roots as pending - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_processed_tx(&self, root: &Hash) -> Result<(), Error> { - retry_tx!(self.pool, tx, { - let root_id = tx.get_id_by_root(root).await?; +async fn mark_root_as_processed( + tx: &mut Transaction<'_, Postgres>, + root: &Hash, +) -> Result<(), Error> { + let root_id = tx.get_id_by_root(root).await?; - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; - let root_id = root_id as i64; - // TODO: Can I get rid of line `AND status <> $2 - let update_previous_roots = sqlx::query( - r#" + let root_id = root_id as i64; + // TODO: Can I get rid of line `AND status <> $2 + let update_previous_roots = sqlx::query( + r#" UPDATE identities SET status = $2, mined_at = CURRENT_TIMESTAMP WHERE id <= $1 AND status <> $2 AND status <> $3; "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Processed)) - .bind(<&str>::from(ProcessedStatus::Mined)); + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Processed)) + .bind(<&str>::from(ProcessedStatus::Mined)); - let update_next_roots = sqlx::query( - r#" + let update_next_roots = sqlx::query( + r#" UPDATE identities SET status = $2, mined_at = NULL WHERE id > $1 "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Pending)); + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Pending)); - tx.execute(update_previous_roots).await?; - tx.execute(update_next_roots).await?; + tx.execute(update_previous_roots).await?; + tx.execute(update_next_roots).await?; - Ok(()) - }) - .await - } + Ok(()) +} - /// Marks the identities and roots from before a given root hash as - /// finalized - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> { - let mined_status = ProcessedStatus::Mined; +pub async fn mark_root_as_mined( + tx: &mut Transaction<'_, Postgres>, + root: &Hash, +) -> Result<(), Error> { + let mined_status = ProcessedStatus::Mined; - retry_tx!(self.pool, tx, { - let root_id = tx.get_id_by_root(root).await?; + let root_id = tx.get_id_by_root(root).await?; - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; - let root_id = root_id as i64; + let root_id = root_id as i64; - let update_previous_roots = sqlx::query( - r#" + let update_previous_roots = sqlx::query( + r#" UPDATE identities SET status = $2 WHERE id <= $1 AND status <> $2 "#, - ) - .bind(root_id) - .bind(<&str>::from(mined_status)); + ) + .bind(root_id) + .bind(<&str>::from(mined_status)); - tx.execute(update_previous_roots).await?; + tx.execute(update_previous_roots).await?; + + Ok(()) +} +/// impl block for database transactions +impl Database { + /// Marks the identities and roots from before a given root hash as mined + /// Also marks following roots as pending + #[instrument(skip(self), level = "debug")] + pub async fn mark_root_as_processed_tx(&self, root: &Hash) -> Result<(), Error> { + retry_tx!(self.pool, tx, mark_root_as_processed(&mut tx, root).await).await + } + + /// Marks the identities and roots from before a given root hash as mined + /// Also marks following roots as pending + #[instrument(skip(self), level = "debug")] + pub async fn mark_root_as_processed_and_delete_batches_tx( + &self, + root: &Hash, + ) -> Result<(), Error> { + retry_tx!(self.pool, tx, { + mark_root_as_processed(&mut tx, root).await?; + tx.delete_batches_after_root(root).await?; Ok(()) }) .await } + + /// Marks the identities and roots from before a given root hash as + /// finalized + #[instrument(skip(self), level = "debug")] + pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> { + retry_tx!(self.pool, tx, mark_root_as_mined(&mut tx, root).await).await + } } diff --git a/src/identity/processor.rs b/src/identity/processor.rs index 9b038a82..b0302f48 100644 --- a/src/identity/processor.rs +++ b/src/identity/processor.rs @@ -18,7 +18,7 @@ use crate::contracts::scanner::BlockScanner; use crate::contracts::IdentityManager; use crate::database::query::DatabaseQuery; use crate::database::types::{BatchEntry, BatchType}; -use crate::database::Database; +use crate::database::{Database, Error}; use crate::ethereum::{Ethereum, ReadProvider}; use crate::identity_tree::{Canonical, Hash, Intermediate, TreeVersion, TreeWithNextVersion}; use crate::prover::identity::Identity; @@ -143,13 +143,18 @@ impl IdentityProcessor for OnChainIdentityProcessor { // Note that we don't have a way of queuing a root here for // finalization. so it's going to stay as "processed" // until the next root is mined. self.database. - self.database.mark_root_as_processed_tx(&root_hash).await?; - self.database.delete_batches_after_root(&root_hash).await?; + self.database + .mark_root_as_processed_and_delete_batches_tx(&root_hash) + .await?; } else { // Db is either empty or we're restarting with a new contract/chain // so we should mark everything as pending - self.database.mark_all_as_pending().await?; - self.database.delete_all_batches().await?; + retry_tx!(self.database.pool, tx, { + tx.mark_all_as_pending().await?; + tx.delete_all_batches().await?; + Result::<(), Error>::Ok(()) + }) + .await?; } Ok(()) diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index e30660ce..6a982fc2 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -3,13 +3,14 @@ use std::time::Duration; use tokio::sync::{Mutex, Notify}; use tokio::time; -use tracing::{info, instrument}; +use tracing::info; use crate::app::App; use crate::database::query::DatabaseQuery as _; use crate::database::types::UnprocessedCommitment; use crate::database::Database; use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; +use crate::utils::retry_tx; pub async fn insert_identities( app: Arc, @@ -36,7 +37,7 @@ pub async fn insert_identities( insert_identities_batch( &app.database, app.tree_state()?.latest_tree(), - unprocessed, + &unprocessed, &pending_insertions_mutex, ) .await?; @@ -46,35 +47,37 @@ pub async fn insert_identities( } } -#[instrument(level = "info", skip_all)] -async fn insert_identities_batch( +pub async fn insert_identities_batch( database: &Database, latest_tree: &TreeVersion, - identities: Vec, + identities: &[UnprocessedCommitment], pending_insertions_mutex: &Mutex<()>, ) -> anyhow::Result<()> { - // Filter out any identities that are already in the `identities` table - let mut filtered_identities = vec![]; - for identity in identities { - if database - .get_identity_leaf_index(&identity.commitment) - .await? - .is_some() - { - tracing::warn!(?identity.commitment, "Duplicate identity"); - database - .remove_unprocessed_identity(&identity.commitment) - .await?; - } else { - filtered_identities.push(identity.commitment); + let filtered_identities = retry_tx!(database, tx, { + // Filter out any identities that are already in the `identities` table + let mut filtered_identities = vec![]; + for identity in identities { + if tx + .get_identity_leaf_index(&identity.commitment) + .await? + .is_some() + { + tracing::warn!(?identity.commitment, "Duplicate identity"); + tx.remove_unprocessed_identity(&identity.commitment).await?; + } else { + filtered_identities.push(identity.commitment); + } } - } + Result::<_, anyhow::Error>::Ok(filtered_identities) + }) + .await?; let _guard = pending_insertions_mutex.lock().await; - let next_db_index = database.get_next_leaf_index().await?; let next_leaf = latest_tree.next_leaf(); + let next_db_index = retry_tx!(database, tx, tx.get_next_leaf_index().await).await?; + assert_eq!( next_leaf, next_db_index, "Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \ @@ -89,15 +92,15 @@ async fn insert_identities_batch( "Length mismatch when appending identities to tree" ); - let items = data.into_iter().zip(filtered_identities); - - for ((root, _proof, leaf_index), identity) in items { - database - .insert_pending_identity(leaf_index, &identity, &root) - .await?; + retry_tx!(database, tx, { + for ((root, _proof, leaf_index), identity) in data.iter().zip(&filtered_identities) { + tx.insert_pending_identity(*leaf_index, identity, root) + .await?; - database.remove_unprocessed_identity(&identity).await?; - } + tx.remove_unprocessed_identity(identity).await?; + } - Ok(()) + Ok(()) + }) + .await }