diff --git a/src/database/transaction.rs b/src/database/transaction.rs index 222bdc43..cfcb78f0 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -1,87 +1,180 @@ -use sqlx::Executor; +use sqlx::{Executor, Postgres, Transaction}; +use tokio::sync::Mutex; use tracing::instrument; use crate::database::query::DatabaseQuery; +use crate::database::types::UnprocessedCommitment; use crate::database::{Database, Error}; -use crate::identity_tree::{Hash, ProcessedStatus}; +use crate::identity_tree::{Hash, Latest, ProcessedStatus, TreeVersion, TreeVersionReadOps}; use crate::utils::retry_tx; -/// impl block for database transactions -impl Database { - /// Marks the identities and roots from before a given root hash as mined - /// Also marks following roots as pending - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_processed_tx(&self, root: &Hash) -> Result<(), Error> { - retry_tx!(self.pool, tx, { - let root_id = tx.get_id_by_root(root).await?; +async fn mark_root_as_processed( + tx: &mut Transaction<'_, Postgres>, + root: &Hash, +) -> Result<(), Error> { + let root_id = tx.get_id_by_root(root).await?; - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; - let root_id = root_id as i64; - // TODO: Can I get rid of line `AND status <> $2 - let update_previous_roots = sqlx::query( - r#" + let root_id = root_id as i64; + // TODO: Can I get rid of line `AND status <> $2 + let update_previous_roots = sqlx::query( + r#" UPDATE identities SET status = $2, mined_at = CURRENT_TIMESTAMP WHERE id <= $1 AND status <> $2 AND status <> $3; "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Processed)) - .bind(<&str>::from(ProcessedStatus::Mined)); + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Processed)) + .bind(<&str>::from(ProcessedStatus::Mined)); - let update_next_roots = sqlx::query( - r#" + let update_next_roots = sqlx::query( + r#" UPDATE identities SET status = $2, mined_at = NULL WHERE id > $1 "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Pending)); + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Pending)); - tx.execute(update_previous_roots).await?; - tx.execute(update_next_roots).await?; + tx.execute(update_previous_roots).await?; + tx.execute(update_next_roots).await?; - Ok(()) - }) - .await - } + Ok(()) +} - /// Marks the identities and roots from before a given root hash as - /// finalized - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> { - let mined_status = ProcessedStatus::Mined; +pub async fn mark_root_as_mined( + tx: &mut Transaction<'_, Postgres>, + root: &Hash, +) -> Result<(), Error> { + let mined_status = ProcessedStatus::Mined; - retry_tx!(self.pool, tx, { - let root_id = tx.get_id_by_root(root).await?; + let root_id = tx.get_id_by_root(root).await?; - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; - let root_id = root_id as i64; + let root_id = root_id as i64; - let update_previous_roots = sqlx::query( - r#" + let update_previous_roots = sqlx::query( + r#" UPDATE identities SET status = $2 WHERE id <= $1 AND status <> $2 "#, - ) - .bind(root_id) - .bind(<&str>::from(mined_status)); + ) + .bind(root_id) + .bind(<&str>::from(mined_status)); + + tx.execute(update_previous_roots).await?; + + Ok(()) +} + +pub async fn insert_identities_batch( + tx: &mut Transaction<'_, Postgres>, + latest_tree: &TreeVersion, + identities: &[UnprocessedCommitment], + pending_insertions_mutex: &Mutex<()>, +) -> anyhow::Result<()> { + // Filter out any identities that are already in the `identities` table + let mut filtered_identities = vec![]; + for identity in identities { + if tx + .get_identity_leaf_index(&identity.commitment) + .await? + .is_some() + { + tracing::warn!(?identity.commitment, "Duplicate identity"); + tx.remove_unprocessed_identity(&identity.commitment).await?; + } else { + filtered_identities.push(identity.commitment); + } + } + + let _guard = pending_insertions_mutex.lock().await; + + let next_db_index = tx.get_next_leaf_index().await?; + let next_leaf = latest_tree.next_leaf(); + + assert_eq!( + next_leaf, next_db_index, + "Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \ + {next_db_index}" + ); + + let data = latest_tree.append_many(&filtered_identities); + + assert_eq!( + data.len(), + filtered_identities.len(), + "Length mismatch when appending identities to tree" + ); - tx.execute(update_previous_roots).await?; + let items = data.into_iter().zip(filtered_identities); + for ((root, _proof, leaf_index), identity) in items { + tx.insert_pending_identity(leaf_index, &identity, &root) + .await?; + + tx.remove_unprocessed_identity(&identity).await?; + } + + Ok(()) +} + +/// impl block for database transactions +impl Database { + /// Marks the identities and roots from before a given root hash as mined + /// Also marks following roots as pending + #[instrument(skip(self), level = "debug")] + pub async fn mark_root_as_processed_tx(&self, root: &Hash) -> Result<(), Error> { + retry_tx!(self.pool, tx, mark_root_as_processed(&mut tx, root).await).await + } + + /// Marks the identities and roots from before a given root hash as mined + /// Also marks following roots as pending + #[instrument(skip(self), level = "debug")] + pub async fn mark_root_as_processed_and_delete_batches_tx( + &self, + root: &Hash, + ) -> Result<(), Error> { + retry_tx!(self.pool, tx, { + mark_root_as_processed(&mut tx, root).await?; + tx.delete_batches_after_root(root).await?; Ok(()) }) .await } + + /// Marks the identities and roots from before a given root hash as + /// finalized + #[instrument(skip(self), level = "debug")] + pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> { + retry_tx!(self.pool, tx, mark_root_as_mined(&mut tx, root).await).await + } + + #[instrument(level = "info", skip_all)] + pub async fn insert_identities_batch_tx( + &self, + latest_tree: &TreeVersion, + identities: Vec, + pending_insertions_mutex: &Mutex<()>, + ) -> anyhow::Result<()> { + retry_tx!( + self.pool, + tx, + insert_identities_batch(&mut tx, latest_tree, &identities, pending_insertions_mutex) + .await + ) + .await + } } diff --git a/src/identity/processor.rs b/src/identity/processor.rs index 9b038a82..b0302f48 100644 --- a/src/identity/processor.rs +++ b/src/identity/processor.rs @@ -18,7 +18,7 @@ use crate::contracts::scanner::BlockScanner; use crate::contracts::IdentityManager; use crate::database::query::DatabaseQuery; use crate::database::types::{BatchEntry, BatchType}; -use crate::database::Database; +use crate::database::{Database, Error}; use crate::ethereum::{Ethereum, ReadProvider}; use crate::identity_tree::{Canonical, Hash, Intermediate, TreeVersion, TreeWithNextVersion}; use crate::prover::identity::Identity; @@ -143,13 +143,18 @@ impl IdentityProcessor for OnChainIdentityProcessor { // Note that we don't have a way of queuing a root here for // finalization. so it's going to stay as "processed" // until the next root is mined. self.database. - self.database.mark_root_as_processed_tx(&root_hash).await?; - self.database.delete_batches_after_root(&root_hash).await?; + self.database + .mark_root_as_processed_and_delete_batches_tx(&root_hash) + .await?; } else { // Db is either empty or we're restarting with a new contract/chain // so we should mark everything as pending - self.database.mark_all_as_pending().await?; - self.database.delete_all_batches().await?; + retry_tx!(self.database.pool, tx, { + tx.mark_all_as_pending().await?; + tx.delete_all_batches().await?; + Result::<(), Error>::Ok(()) + }) + .await?; } Ok(()) diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index e30660ce..5779d276 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -3,13 +3,11 @@ use std::time::Duration; use tokio::sync::{Mutex, Notify}; use tokio::time; -use tracing::{info, instrument}; +use tracing::info; use crate::app::App; use crate::database::query::DatabaseQuery as _; -use crate::database::types::UnprocessedCommitment; -use crate::database::Database; -use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; +use crate::identity_tree::UnprocessedStatus; pub async fn insert_identities( app: Arc, @@ -33,71 +31,15 @@ pub async fn insert_identities( continue; } - insert_identities_batch( - &app.database, - app.tree_state()?.latest_tree(), - unprocessed, - &pending_insertions_mutex, - ) - .await?; + app.database + .insert_identities_batch_tx( + app.tree_state()?.latest_tree(), + unprocessed, + &pending_insertions_mutex, + ) + .await?; // Notify the identity processing task, that there are new identities wake_up_notify.notify_one(); } } - -#[instrument(level = "info", skip_all)] -async fn insert_identities_batch( - database: &Database, - latest_tree: &TreeVersion, - identities: Vec, - pending_insertions_mutex: &Mutex<()>, -) -> anyhow::Result<()> { - // Filter out any identities that are already in the `identities` table - let mut filtered_identities = vec![]; - for identity in identities { - if database - .get_identity_leaf_index(&identity.commitment) - .await? - .is_some() - { - tracing::warn!(?identity.commitment, "Duplicate identity"); - database - .remove_unprocessed_identity(&identity.commitment) - .await?; - } else { - filtered_identities.push(identity.commitment); - } - } - - let _guard = pending_insertions_mutex.lock().await; - - let next_db_index = database.get_next_leaf_index().await?; - let next_leaf = latest_tree.next_leaf(); - - assert_eq!( - next_leaf, next_db_index, - "Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \ - {next_db_index}" - ); - - let data = latest_tree.append_many(&filtered_identities); - - assert_eq!( - data.len(), - filtered_identities.len(), - "Length mismatch when appending identities to tree" - ); - - let items = data.into_iter().zip(filtered_identities); - - for ((root, _proof, leaf_index), identity) in items { - database - .insert_pending_identity(leaf_index, &identity, &root) - .await?; - - database.remove_unprocessed_identity(&identity).await?; - } - - Ok(()) -}