Skip to content

Commit

Permalink
Retry transactions that fail due to serializable isolation level (#736)
Browse files Browse the repository at this point in the history
* split up database/mod.rs

* retry

* retry macro rules

* added warning for failed tx commits
  • Loading branch information
0xForerunner authored May 24, 2024
1 parent 6cdf164 commit d853878
Show file tree
Hide file tree
Showing 13 changed files with 886 additions and 818 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ semaphore = { git = "https://github.com/worldcoin/semaphore-rs", rev = "5170c422
similar-asserts = "1.5.0"
test-case = "3.0"
testcontainers = "0.15.0"
testcontainers-modules = { version = "0.3.7", features = ["postgres"] }
testcontainers-modules = { version = "0.3.7", features = ["postgres"] }
tracing-subscriber = "0.3.11"
tracing-test = "0.2"

Expand Down
83 changes: 42 additions & 41 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use tracing::{info, instrument, warn};

use crate::config::Config;
use crate::contracts::{IdentityManager, SharedIdentityManager};
use crate::database::{Database, DatabaseExt as _};
use crate::database::query::DatabaseQuery as _;
use crate::database::Database;
use crate::ethereum::Ethereum;
use crate::identity_tree::{
CanonicalTreeBuilder, Hash, InclusionProof, ProcessedStatus, RootItem, Status, TreeState,
Expand All @@ -25,6 +26,7 @@ use crate::server::data::{
VerifySemaphoreProofRequest, VerifySemaphoreProofResponse,
};
use crate::server::error::Error as ServerError;
use crate::utils::retry_tx;
use crate::utils::tree_updates::dedup_tree_updates;

pub struct App {
Expand Down Expand Up @@ -111,7 +113,7 @@ impl App {
if root_hash != initial_root_hash {
// Note that we don't have a way of queuing a root here for finalization.
// so it's going to stay as "processed" until the next root is mined.
self.database.mark_root_as_processed(&root_hash).await?;
self.database.mark_root_as_processed_tx(&root_hash).await?;
} else {
// Db is either empty or we're restarting with a new contract/chain
// so we should mark everything as pending
Expand Down Expand Up @@ -401,10 +403,11 @@ impl App {
Ok(())
}

pub async fn delete_identity(&self, commitment: &Hash) -> Result<(), ServerError> {
let mut tx = self.database.begin().await?;
self.delete_identity_tx(&mut tx, commitment).await?;
tx.commit().await?;
pub async fn delete_identity_tx(&self, commitment: &Hash) -> Result<(), ServerError> {
retry_tx!(self.database.pool, tx, {
self.delete_identity(&mut tx, commitment).await
})
.await?;
Ok(())
}

Expand All @@ -415,7 +418,7 @@ impl App {
/// Will return `Err` if identity is already queued, not in the tree, or the
/// queue malfunctions.
#[instrument(level = "debug", skip(self, tx))]
pub async fn delete_identity_tx(
pub async fn delete_identity(
&self,
tx: &mut Transaction<'_, Postgres>,
commitment: &Hash,
Expand Down Expand Up @@ -477,46 +480,44 @@ impl App {
existing_commitment: &Hash,
new_commitment: &Hash,
) -> Result<(), ServerError> {
if *new_commitment == self.identity_manager.initial_leaf_value() {
warn!(
?new_commitment,
"Attempt to insert initial leaf in recovery."
);
return Err(ServerError::InvalidCommitment);
}

if !self.identity_manager.has_insertion_provers().await {
warn!(
?new_commitment,
"Identity Manager has no provers. Add provers with /addBatchSize request."
);
return Err(ServerError::NoProversOnIdInsert);
}

if !self.identity_is_reduced(*new_commitment) {
warn!(
?new_commitment,
"The new identity commitment is not reduced."
);
return Err(ServerError::UnreducedCommitment);
}
retry_tx!(self.database.pool, tx, {
if *new_commitment == self.identity_manager.initial_leaf_value() {
warn!(
?new_commitment,
"Attempt to insert initial leaf in recovery."
);
return Err(ServerError::InvalidCommitment);
}

let mut tx = self.database.begin().await?;
if !self.identity_manager.has_insertion_provers().await {
warn!(
?new_commitment,
"Identity Manager has no provers. Add provers with /addBatchSize request."
);
return Err(ServerError::NoProversOnIdInsert);
}

if tx.identity_exists(*new_commitment).await? {
return Err(ServerError::DuplicateCommitment);
}
if !self.identity_is_reduced(*new_commitment) {
warn!(
?new_commitment,
"The new identity commitment is not reduced."
);
return Err(ServerError::UnreducedCommitment);
}

// Delete the existing id and insert the commitments into the recovery table
self.delete_identity_tx(&mut tx, existing_commitment)
.await?;
if tx.identity_exists(*new_commitment).await? {
return Err(ServerError::DuplicateCommitment);
}

tx.insert_new_recovery(existing_commitment, new_commitment)
.await?;
// Delete the existing id and insert the commitments into the recovery table
self.delete_identity(&mut tx, existing_commitment).await?;

tx.commit().await?;
tx.insert_new_recovery(existing_commitment, new_commitment)
.await?;

Ok(())
Ok(())
})
.await
}

pub async fn identity_history(
Expand Down
Loading

0 comments on commit d853878

Please sign in to comment.