Skip to content

Commit

Permalink
Transactionalize Insert Identities Batch (#757)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xForerunner authored Jul 1, 2024
1 parent 05dcee5 commit 1594eb2
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 83 deletions.
119 changes: 71 additions & 48 deletions src/database/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,87 +1,110 @@
use sqlx::Executor;
use sqlx::{Executor, Postgres, Transaction};
use tracing::instrument;

use crate::database::query::DatabaseQuery;
use crate::database::{Database, Error};
use crate::identity_tree::{Hash, ProcessedStatus};
use crate::utils::retry_tx;

/// impl block for database transactions
impl Database {
/// Marks the identities and roots from before a given root hash as mined
/// Also marks following roots as pending
#[instrument(skip(self), level = "debug")]
pub async fn mark_root_as_processed_tx(&self, root: &Hash) -> Result<(), Error> {
retry_tx!(self.pool, tx, {
let root_id = tx.get_id_by_root(root).await?;
async fn mark_root_as_processed(
tx: &mut Transaction<'_, Postgres>,
root: &Hash,
) -> Result<(), Error> {
let root_id = tx.get_id_by_root(root).await?;

let Some(root_id) = root_id else {
return Err(Error::MissingRoot { root: *root });
};
let Some(root_id) = root_id else {
return Err(Error::MissingRoot { root: *root });
};

let root_id = root_id as i64;
// TODO: Can I get rid of line `AND status <> $2
let update_previous_roots = sqlx::query(
r#"
let root_id = root_id as i64;
// TODO: Can I get rid of line `AND status <> $2
let update_previous_roots = sqlx::query(
r#"
UPDATE identities
SET status = $2, mined_at = CURRENT_TIMESTAMP
WHERE id <= $1
AND status <> $2
AND status <> $3;
"#,
)
.bind(root_id)
.bind(<&str>::from(ProcessedStatus::Processed))
.bind(<&str>::from(ProcessedStatus::Mined));
)
.bind(root_id)
.bind(<&str>::from(ProcessedStatus::Processed))
.bind(<&str>::from(ProcessedStatus::Mined));

let update_next_roots = sqlx::query(
r#"
let update_next_roots = sqlx::query(
r#"
UPDATE identities
SET status = $2, mined_at = NULL
WHERE id > $1
"#,
)
.bind(root_id)
.bind(<&str>::from(ProcessedStatus::Pending));
)
.bind(root_id)
.bind(<&str>::from(ProcessedStatus::Pending));

tx.execute(update_previous_roots).await?;
tx.execute(update_next_roots).await?;
tx.execute(update_previous_roots).await?;
tx.execute(update_next_roots).await?;

Ok(())
})
.await
}
Ok(())
}

/// Marks the identities and roots from before a given root hash as
/// finalized
#[instrument(skip(self), level = "debug")]
pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> {
let mined_status = ProcessedStatus::Mined;
pub async fn mark_root_as_mined(
tx: &mut Transaction<'_, Postgres>,
root: &Hash,
) -> Result<(), Error> {
let mined_status = ProcessedStatus::Mined;

retry_tx!(self.pool, tx, {
let root_id = tx.get_id_by_root(root).await?;
let root_id = tx.get_id_by_root(root).await?;

let Some(root_id) = root_id else {
return Err(Error::MissingRoot { root: *root });
};
let Some(root_id) = root_id else {
return Err(Error::MissingRoot { root: *root });
};

let root_id = root_id as i64;
let root_id = root_id as i64;

let update_previous_roots = sqlx::query(
r#"
let update_previous_roots = sqlx::query(
r#"
UPDATE identities
SET status = $2
WHERE id <= $1
AND status <> $2
"#,
)
.bind(root_id)
.bind(<&str>::from(mined_status));
)
.bind(root_id)
.bind(<&str>::from(mined_status));

tx.execute(update_previous_roots).await?;
tx.execute(update_previous_roots).await?;

Ok(())
}

/// impl block for database transactions
impl Database {
/// Marks the identities and roots from before a given root hash as mined
/// Also marks following roots as pending
#[instrument(skip(self), level = "debug")]
pub async fn mark_root_as_processed_tx(&self, root: &Hash) -> Result<(), Error> {
retry_tx!(self.pool, tx, mark_root_as_processed(&mut tx, root).await).await
}

/// Marks the identities and roots from before a given root hash as mined
/// Also marks following roots as pending
#[instrument(skip(self), level = "debug")]
pub async fn mark_root_as_processed_and_delete_batches_tx(
&self,
root: &Hash,
) -> Result<(), Error> {
retry_tx!(self.pool, tx, {
mark_root_as_processed(&mut tx, root).await?;
tx.delete_batches_after_root(root).await?;
Ok(())
})
.await
}

/// Marks the identities and roots from before a given root hash as
/// finalized
#[instrument(skip(self), level = "debug")]
pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> {
retry_tx!(self.pool, tx, mark_root_as_mined(&mut tx, root).await).await
}
}
15 changes: 10 additions & 5 deletions src/identity/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::contracts::scanner::BlockScanner;
use crate::contracts::IdentityManager;
use crate::database::query::DatabaseQuery;
use crate::database::types::{BatchEntry, BatchType};
use crate::database::Database;
use crate::database::{Database, Error};
use crate::ethereum::{Ethereum, ReadProvider};
use crate::identity_tree::{Canonical, Hash, Intermediate, TreeVersion, TreeWithNextVersion};
use crate::prover::identity::Identity;
Expand Down Expand Up @@ -143,13 +143,18 @@ impl IdentityProcessor for OnChainIdentityProcessor {
// Note that we don't have a way of queuing a root here for
// finalization. so it's going to stay as "processed"
// until the next root is mined. self.database.
self.database.mark_root_as_processed_tx(&root_hash).await?;
self.database.delete_batches_after_root(&root_hash).await?;
self.database
.mark_root_as_processed_and_delete_batches_tx(&root_hash)
.await?;
} else {
// Db is either empty or we're restarting with a new contract/chain
// so we should mark everything as pending
self.database.mark_all_as_pending().await?;
self.database.delete_all_batches().await?;
retry_tx!(self.database.pool, tx, {
tx.mark_all_as_pending().await?;
tx.delete_all_batches().await?;
Result::<(), Error>::Ok(())
})
.await?;
}

Ok(())
Expand Down
63 changes: 33 additions & 30 deletions src/task_monitor/tasks/insert_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::time::Duration;

use tokio::sync::{Mutex, Notify};
use tokio::time;
use tracing::{info, instrument};
use tracing::info;

use crate::app::App;
use crate::database::query::DatabaseQuery as _;
use crate::database::types::UnprocessedCommitment;
use crate::database::Database;
use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus};
use crate::utils::retry_tx;

pub async fn insert_identities(
app: Arc<App>,
Expand All @@ -36,7 +37,7 @@ pub async fn insert_identities(
insert_identities_batch(
&app.database,
app.tree_state()?.latest_tree(),
unprocessed,
&unprocessed,
&pending_insertions_mutex,
)
.await?;
Expand All @@ -46,35 +47,37 @@ pub async fn insert_identities(
}
}

#[instrument(level = "info", skip_all)]
async fn insert_identities_batch(
pub async fn insert_identities_batch(
database: &Database,
latest_tree: &TreeVersion<Latest>,
identities: Vec<UnprocessedCommitment>,
identities: &[UnprocessedCommitment],
pending_insertions_mutex: &Mutex<()>,
) -> anyhow::Result<()> {
// Filter out any identities that are already in the `identities` table
let mut filtered_identities = vec![];
for identity in identities {
if database
.get_identity_leaf_index(&identity.commitment)
.await?
.is_some()
{
tracing::warn!(?identity.commitment, "Duplicate identity");
database
.remove_unprocessed_identity(&identity.commitment)
.await?;
} else {
filtered_identities.push(identity.commitment);
let filtered_identities = retry_tx!(database, tx, {
// Filter out any identities that are already in the `identities` table
let mut filtered_identities = vec![];
for identity in identities {
if tx
.get_identity_leaf_index(&identity.commitment)
.await?
.is_some()
{
tracing::warn!(?identity.commitment, "Duplicate identity");
tx.remove_unprocessed_identity(&identity.commitment).await?;
} else {
filtered_identities.push(identity.commitment);
}
}
}
Result::<_, anyhow::Error>::Ok(filtered_identities)
})
.await?;

let _guard = pending_insertions_mutex.lock().await;

let next_db_index = database.get_next_leaf_index().await?;
let next_leaf = latest_tree.next_leaf();

let next_db_index = retry_tx!(database, tx, tx.get_next_leaf_index().await).await?;

assert_eq!(
next_leaf, next_db_index,
"Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \
Expand All @@ -89,15 +92,15 @@ async fn insert_identities_batch(
"Length mismatch when appending identities to tree"
);

let items = data.into_iter().zip(filtered_identities);

for ((root, _proof, leaf_index), identity) in items {
database
.insert_pending_identity(leaf_index, &identity, &root)
.await?;
retry_tx!(database, tx, {
for ((root, _proof, leaf_index), identity) in data.iter().zip(&filtered_identities) {
tx.insert_pending_identity(*leaf_index, identity, root)
.await?;

database.remove_unprocessed_identity(&identity).await?;
}
tx.remove_unprocessed_identity(identity).await?;
}

Ok(())
Ok(())
})
.await
}

0 comments on commit 1594eb2

Please sign in to comment.