Skip to content

Commit

Permalink
fix for jakub
Browse files Browse the repository at this point in the history
  • Loading branch information
0xForerunner committed Jun 30, 2024
1 parent e8b3099 commit e1955d8
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 79 deletions.
72 changes: 1 addition & 71 deletions src/database/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use sqlx::{Executor, Postgres, Transaction};
use tokio::sync::Mutex;
use tracing::instrument;

use crate::database::query::DatabaseQuery;
use crate::database::types::UnprocessedCommitment;
use crate::database::{Database, Error};
use crate::identity_tree::{Hash, Latest, ProcessedStatus, TreeVersion, TreeVersionReadOps};
use crate::identity_tree::{Hash, ProcessedStatus};
use crate::utils::retry_tx;

async fn mark_root_as_processed(
Expand Down Expand Up @@ -79,58 +77,6 @@ pub async fn mark_root_as_mined(
Ok(())
}

pub async fn insert_identities_batch(
tx: &mut Transaction<'_, Postgres>,
latest_tree: &TreeVersion<Latest>,
identities: &[UnprocessedCommitment],
pending_insertions_mutex: &Mutex<()>,
) -> anyhow::Result<()> {
// Filter out any identities that are already in the `identities` table
let mut filtered_identities = vec![];
for identity in identities {
if tx
.get_identity_leaf_index(&identity.commitment)
.await?
.is_some()
{
tracing::warn!(?identity.commitment, "Duplicate identity");
tx.remove_unprocessed_identity(&identity.commitment).await?;
} else {
filtered_identities.push(identity.commitment);
}
}

let _guard = pending_insertions_mutex.lock().await;

let next_db_index = tx.get_next_leaf_index().await?;
let next_leaf = latest_tree.next_leaf();

assert_eq!(
next_leaf, next_db_index,
"Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \
{next_db_index}"
);

let data = latest_tree.append_many(&filtered_identities);

assert_eq!(
data.len(),
filtered_identities.len(),
"Length mismatch when appending identities to tree"
);

let items = data.into_iter().zip(filtered_identities);

for ((root, _proof, leaf_index), identity) in items {
tx.insert_pending_identity(leaf_index, &identity, &root)
.await?;

tx.remove_unprocessed_identity(&identity).await?;
}

Ok(())
}

/// impl block for database transactions
impl Database {
/// Marks the identities and roots from before a given root hash as mined
Expand Down Expand Up @@ -161,20 +107,4 @@ impl Database {
pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> {
retry_tx!(self.pool, tx, mark_root_as_mined(&mut tx, root).await).await
}

#[instrument(level = "info", skip_all)]
pub async fn insert_identities_batch_tx(
&self,
latest_tree: &TreeVersion<Latest>,
identities: Vec<UnprocessedCommitment>,
pending_insertions_mutex: &Mutex<()>,
) -> anyhow::Result<()> {
retry_tx!(
self.pool,
tx,
insert_identities_batch(&mut tx, latest_tree, &identities, pending_insertions_mutex)
.await
)
.await
}
}
77 changes: 69 additions & 8 deletions src/task_monitor/tasks/insert_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use tracing::info;

use crate::app::App;
use crate::database::query::DatabaseQuery as _;
use crate::identity_tree::UnprocessedStatus;
use crate::database::types::UnprocessedCommitment;
use crate::database::Database;
use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus};
use crate::utils::retry_tx;

pub async fn insert_identities(
app: Arc<App>,
Expand All @@ -31,15 +34,73 @@ pub async fn insert_identities(
continue;
}

app.database
.insert_identities_batch_tx(
app.tree_state()?.latest_tree(),
unprocessed,
&pending_insertions_mutex,
)
.await?;
insert_identities_batch(
&app.database,
app.tree_state()?.latest_tree(),
&unprocessed,
&pending_insertions_mutex,
)
.await?;

// Notify the identity processing task, that there are new identities
wake_up_notify.notify_one();
}
}

pub async fn insert_identities_batch(
database: &Database,
latest_tree: &TreeVersion<Latest>,
identities: &[UnprocessedCommitment],
pending_insertions_mutex: &Mutex<()>,
) -> anyhow::Result<()> {
let filtered_identities = retry_tx!(database, tx, {
// Filter out any identities that are already in the `identities` table
let mut filtered_identities = vec![];
for identity in identities {
if tx
.get_identity_leaf_index(&identity.commitment)
.await?
.is_some()
{
tracing::warn!(?identity.commitment, "Duplicate identity");
tx.remove_unprocessed_identity(&identity.commitment).await?;
} else {
filtered_identities.push(identity.commitment);
}
}
Result::<_, anyhow::Error>::Ok(filtered_identities)
})
.await?;

let _guard = pending_insertions_mutex.lock().await;

let next_leaf = latest_tree.next_leaf();

let next_db_index = retry_tx!(database, tx, tx.get_next_leaf_index().await).await?;

assert_eq!(
next_leaf, next_db_index,
"Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \
{next_db_index}"
);

let data = latest_tree.append_many(&filtered_identities);

assert_eq!(
data.len(),
filtered_identities.len(),
"Length mismatch when appending identities to tree"
);

retry_tx!(database, tx, {
for ((root, _proof, leaf_index), identity) in data.iter().zip(&filtered_identities) {
tx.insert_pending_identity(*leaf_index, identity, root)
.await?;

tx.remove_unprocessed_identity(identity).await?;
}

Ok(())
})
.await
}

0 comments on commit e1955d8

Please sign in to comment.