From e1955d80108e407bac788cdbf9f9fb65c5c92263 Mon Sep 17 00:00:00 2001 From: Eric Woolsey Date: Sun, 30 Jun 2024 11:49:10 -0700 Subject: [PATCH] fix for jakub --- src/database/transaction.rs | 72 +------------------ src/task_monitor/tasks/insert_identities.rs | 77 ++++++++++++++++++--- 2 files changed, 70 insertions(+), 79 deletions(-) diff --git a/src/database/transaction.rs b/src/database/transaction.rs index cfcb78f0..10109429 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -1,11 +1,9 @@ use sqlx::{Executor, Postgres, Transaction}; -use tokio::sync::Mutex; use tracing::instrument; use crate::database::query::DatabaseQuery; -use crate::database::types::UnprocessedCommitment; use crate::database::{Database, Error}; -use crate::identity_tree::{Hash, Latest, ProcessedStatus, TreeVersion, TreeVersionReadOps}; +use crate::identity_tree::{Hash, ProcessedStatus}; use crate::utils::retry_tx; async fn mark_root_as_processed( @@ -79,58 +77,6 @@ pub async fn mark_root_as_mined( Ok(()) } -pub async fn insert_identities_batch( - tx: &mut Transaction<'_, Postgres>, - latest_tree: &TreeVersion, - identities: &[UnprocessedCommitment], - pending_insertions_mutex: &Mutex<()>, -) -> anyhow::Result<()> { - // Filter out any identities that are already in the `identities` table - let mut filtered_identities = vec![]; - for identity in identities { - if tx - .get_identity_leaf_index(&identity.commitment) - .await? - .is_some() - { - tracing::warn!(?identity.commitment, "Duplicate identity"); - tx.remove_unprocessed_identity(&identity.commitment).await?; - } else { - filtered_identities.push(identity.commitment); - } - } - - let _guard = pending_insertions_mutex.lock().await; - - let next_db_index = tx.get_next_leaf_index().await?; - let next_leaf = latest_tree.next_leaf(); - - assert_eq!( - next_leaf, next_db_index, - "Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \ - {next_db_index}" - ); - - let data = latest_tree.append_many(&filtered_identities); - - assert_eq!( - data.len(), - filtered_identities.len(), - "Length mismatch when appending identities to tree" - ); - - let items = data.into_iter().zip(filtered_identities); - - for ((root, _proof, leaf_index), identity) in items { - tx.insert_pending_identity(leaf_index, &identity, &root) - .await?; - - tx.remove_unprocessed_identity(&identity).await?; - } - - Ok(()) -} - /// impl block for database transactions impl Database { /// Marks the identities and roots from before a given root hash as mined @@ -161,20 +107,4 @@ impl Database { pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> { retry_tx!(self.pool, tx, mark_root_as_mined(&mut tx, root).await).await } - - #[instrument(level = "info", skip_all)] - pub async fn insert_identities_batch_tx( - &self, - latest_tree: &TreeVersion, - identities: Vec, - pending_insertions_mutex: &Mutex<()>, - ) -> anyhow::Result<()> { - retry_tx!( - self.pool, - tx, - insert_identities_batch(&mut tx, latest_tree, &identities, pending_insertions_mutex) - .await - ) - .await - } } diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index 5779d276..6a982fc2 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -7,7 +7,10 @@ use tracing::info; use crate::app::App; use crate::database::query::DatabaseQuery as _; -use crate::identity_tree::UnprocessedStatus; +use crate::database::types::UnprocessedCommitment; +use crate::database::Database; +use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; +use crate::utils::retry_tx; pub async fn insert_identities( app: Arc, @@ -31,15 +34,73 @@ pub async fn insert_identities( continue; } - app.database - .insert_identities_batch_tx( - app.tree_state()?.latest_tree(), - unprocessed, - &pending_insertions_mutex, - ) - .await?; + insert_identities_batch( + &app.database, + app.tree_state()?.latest_tree(), + &unprocessed, + &pending_insertions_mutex, + ) + .await?; // Notify the identity processing task, that there are new identities wake_up_notify.notify_one(); } } + +pub async fn insert_identities_batch( + database: &Database, + latest_tree: &TreeVersion, + identities: &[UnprocessedCommitment], + pending_insertions_mutex: &Mutex<()>, +) -> anyhow::Result<()> { + let filtered_identities = retry_tx!(database, tx, { + // Filter out any identities that are already in the `identities` table + let mut filtered_identities = vec![]; + for identity in identities { + if tx + .get_identity_leaf_index(&identity.commitment) + .await? + .is_some() + { + tracing::warn!(?identity.commitment, "Duplicate identity"); + tx.remove_unprocessed_identity(&identity.commitment).await?; + } else { + filtered_identities.push(identity.commitment); + } + } + Result::<_, anyhow::Error>::Ok(filtered_identities) + }) + .await?; + + let _guard = pending_insertions_mutex.lock().await; + + let next_leaf = latest_tree.next_leaf(); + + let next_db_index = retry_tx!(database, tx, tx.get_next_leaf_index().await).await?; + + assert_eq!( + next_leaf, next_db_index, + "Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \ + {next_db_index}" + ); + + let data = latest_tree.append_many(&filtered_identities); + + assert_eq!( + data.len(), + filtered_identities.len(), + "Length mismatch when appending identities to tree" + ); + + retry_tx!(database, tx, { + for ((root, _proof, leaf_index), identity) in data.iter().zip(&filtered_identities) { + tx.insert_pending_identity(*leaf_index, identity, root) + .await?; + + tx.remove_unprocessed_identity(identity).await?; + } + + Ok(()) + }) + .await +}