Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax transaction isolation levels + fix insert identities constraints #795

Merged
merged 11 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 19 additions & 88 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,22 @@ use std::sync::{Arc, OnceLock};
use chrono::{Duration, Utc};
use ruint::Uint;
use semaphore::protocol::verify_proof;
use sqlx::{Postgres, Transaction};
use tracing::{info, instrument, warn};

use crate::config::Config;
use crate::contracts::IdentityManager;
use crate::database::methods::DbMethods as _;
use crate::database::Database;
use crate::database::{Database, IsolationLevel};
use crate::ethereum::Ethereum;
use crate::identity::processor::{
IdentityProcessor, OffChainIdentityProcessor, OnChainIdentityProcessor,
};
use crate::identity::validator::IdentityValidator;
use crate::identity_tree::initializer::TreeInitializer;
use crate::identity_tree::{Hash, InclusionProof, RootItem, TreeState, TreeVersionReadOps};
use crate::identity_tree::{Hash, InclusionProof, RootItem, TreeState, TreeVersionOps};
use crate::prover::map::initialize_prover_maps;
use crate::prover::repository::ProverRepository;
use crate::prover::{ProverConfig, ProverType};
use crate::retry_tx;
use crate::server::data::{
InclusionProofResponse, ListBatchSizesResponse, VerifySemaphoreProofQuery,
VerifySemaphoreProofRequest, VerifySemaphoreProofResponse,
Expand Down Expand Up @@ -157,22 +155,19 @@ impl App {

// TODO: ensure that the id is not in the tree or in unprocessed identities

if self.database.identity_exists(commitment).await? {
let mut tx = self
.database
.begin_tx(IsolationLevel::ReadCommitted)
.await?;

if tx.identity_exists(commitment).await? {
return Err(ServerError::DuplicateCommitment);
}

self.database
.insert_new_identity(commitment, Utc::now())
.await?;
tx.insert_new_identity(commitment, Utc::now()).await?;

Ok(())
}
tx.commit().await?;

pub async fn delete_identity_tx(&self, commitment: &Hash) -> Result<(), ServerError> {
retry_tx!(self.database.pool, tx, {
self.delete_identity(&mut tx, commitment).await
})
.await?;
Ok(())
}

Expand All @@ -182,12 +177,13 @@ impl App {
///
/// Will return `Err` if identity is already queued, not in the tree, or the
/// queue malfunctions.
#[instrument(level = "debug", skip(self, tx))]
pub async fn delete_identity(
&self,
tx: &mut Transaction<'_, Postgres>,
commitment: &Hash,
) -> Result<(), ServerError> {
#[instrument(level = "debug", skip(self))]
pub async fn delete_identity(&self, commitment: &Hash) -> Result<(), ServerError> {
let mut tx = self
.database
.begin_tx(IsolationLevel::RepeatableRead)
.await?;

// Ensure that deletion provers exist
if !self.prover_repository.has_deletion_provers().await {
warn!(
Expand All @@ -213,76 +209,11 @@ impl App {
return Err(ServerError::IdentityAlreadyDeleted);
}

// Check if the id is already queued for deletion
if tx.identity_is_queued_for_deletion(commitment).await? {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the insert_new_deletion method has been changed to be idempotent. Therefore there's no longer a need to check if we have already accepted a deletion

return Err(ServerError::IdentityQueuedForDeletion);
}

// Check if there are any deletions, if not, set the latest deletion timestamp
// to now to ensure that the new deletion is processed by the next deletion
// interval
if tx.get_deletions().await?.is_empty() {
tx.update_latest_deletion(Utc::now()).await?;
}

// If the id has not been deleted, insert into the deletions table
tx.insert_new_deletion(leaf_index, commitment).await?;

Ok(())
}

/// Queues a recovery of an identity.
///
/// i.e. deletion and reinsertion after a set period of time.
///
/// # Errors
///
/// Will return `Err` if identity is already queued for deletion, not in the
/// tree, or the queue malfunctions.
#[instrument(level = "debug", skip(self))]
pub async fn recover_identity(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First step in disabling recovery - deleting the API endpoint

&self,
existing_commitment: &Hash,
new_commitment: &Hash,
) -> Result<(), ServerError> {
retry_tx!(self.database.pool, tx, {
if self.identity_validator.is_initial_leaf(new_commitment) {
warn!(
?new_commitment,
"Attempt to insert initial leaf in recovery."
);
return Err(ServerError::InvalidCommitment);
}

if !self.prover_repository.has_insertion_provers().await {
warn!(
?new_commitment,
"Identity Manager has no provers. Add provers with /addBatchSize request."
);
return Err(ServerError::NoProversOnIdInsert);
}

if !self.identity_validator.is_reduced(*new_commitment) {
warn!(
?new_commitment,
"The new identity commitment is not reduced."
);
return Err(ServerError::UnreducedCommitment);
}

if tx.identity_exists(*new_commitment).await? {
return Err(ServerError::DuplicateCommitment);
}

// Delete the existing id and insert the commitments into the recovery table
self.delete_identity(&mut tx, existing_commitment).await?;

tx.insert_new_recovery(existing_commitment, new_commitment)
.await?;
tx.commit().await?;

Ok(())
})
.await
Ok(())
}

fn merge_env_provers(
Expand Down
53 changes: 25 additions & 28 deletions src/database/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,9 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
Ok(res)
}

/// Inserts a new deletion into the deletions table
///
/// This method is idempotent and on conflict nothing will happen
#[instrument(skip(self), level = "debug")]
async fn insert_new_deletion(self, leaf_index: usize, identity: &Hash) -> Result<(), Error> {
let mut conn = self.acquire().await?;
Expand All @@ -629,6 +632,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
r#"
INSERT INTO deletions (leaf_index, commitment)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
"#,
)
.bind(leaf_index as i64)
Expand Down Expand Up @@ -685,12 +689,12 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
async fn get_eligible_unprocessed_commitments(
self,
status: UnprocessedStatus,
) -> Result<Vec<types::UnprocessedCommitment>, Error> {
) -> Result<Vec<Hash>, Error> {
let mut conn = self.acquire().await?;

let result = sqlx::query(
let result: Vec<(Hash,)> = sqlx::query_as(
r#"
SELECT * FROM unprocessed_identities
SELECT commitment FROM unprocessed_identities
WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility
LIMIT $2
"#,
Expand All @@ -700,17 +704,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
.fetch_all(&mut *conn)
.await?;

Ok(result
.into_iter()
.map(|row| types::UnprocessedCommitment {
commitment: row.get::<Hash, _>(0),
status,
created_at: row.get::<_, _>(2),
processed_at: row.get::<_, _>(3),
error_message: row.get::<_, _>(4),
eligibility_timestamp: row.get::<_, _>(5),
})
.collect::<Vec<_>>())
Ok(result.into_iter().map(|(commitment,)| commitment).collect())
}

/// Returns the error message from the unprocessed identities table
Expand Down Expand Up @@ -756,6 +750,23 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
Ok(())
}

#[instrument(skip(self), level = "debug")]
async fn trim_unprocessed(self) -> Result<(), Error> {
let mut conn = self.acquire().await?;

sqlx::query(
r#"
DELETE FROM unprocessed_identities u
USING identities i
WHERE u.commitment = i.commitment
"#,
)
.execute(&mut *conn)
.await?;

Ok(())
}

#[instrument(skip(self), level = "debug")]
async fn identity_exists(self, commitment: Hash) -> Result<bool, Error> {
let mut conn = self.acquire().await?;
Expand All @@ -773,20 +784,6 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
.get::<bool, _>(0))
}

// TODO: add docs
#[instrument(skip(self), level = "debug")]
async fn identity_is_queued_for_deletion(self, commitment: &Hash) -> Result<bool, Error> {
let mut conn = self.acquire().await?;

let row_unprocessed =
sqlx::query(r#"SELECT exists(SELECT 1 FROM deletions where commitment = $1)"#)
.bind(commitment)
.fetch_one(&mut *conn)
.await?;

Ok(row_unprocessed.get::<bool, _>(0))
}

#[instrument(skip(self), level = "debug")]
async fn insert_new_batch_head(self, next_root: &Hash) -> Result<(), Error> {
let mut conn = self.acquire().await?;
Expand Down
Loading
Loading