From 7ed42e389ce7ddcbc275191ee94090bafdb14b2c Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Tue, 15 Oct 2024 15:11:07 +0200 Subject: [PATCH 1/7] All query methods --- src/database/methods.rs | 886 ++++++++++++++++++++++++++++++++++++++++ src/database/mod.rs | 1 + src/database/query.rs | 1 + 3 files changed, 888 insertions(+) create mode 100644 src/database/methods.rs diff --git a/src/database/methods.rs b/src/database/methods.rs new file mode 100644 index 00000000..9469f124 --- /dev/null +++ b/src/database/methods.rs @@ -0,0 +1,886 @@ +use std::collections::HashSet; + +use axum::async_trait; +use chrono::{DateTime, Utc}; +use ruint::aliases::U256; +use sqlx::{Acquire, Executor, Postgres, Row}; +use tracing::instrument; +use types::{DeletionEntry, RecoveryEntry}; + +use super::types::{LatestDeletionEntry, LatestInsertionEntry}; +use crate::database::types::{BatchEntry, BatchEntryData, BatchType}; +use crate::database::{types, Error}; +use crate::identity_tree::{ + Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, +}; +use crate::prover::identity::Identity; +use crate::prover::{ProverConfig, ProverType}; + +const MAX_UNPROCESSED_FETCH_COUNT: i64 = 10_000; + +#[async_trait] +pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { + async fn insert_pending_identity( + self, + leaf_index: usize, + identity: &Hash, + root: &Hash, + pre_root: &Hash, + ) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + INSERT INTO identities (leaf_index, commitment, root, status, pending_as_of, pre_root) + VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP, $5) + "#, + ) + .bind(leaf_index as i64) + .bind(identity) + .bind(root) + .bind(<&str>::from(ProcessedStatus::Pending)) + .bind(pre_root) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn get_id_by_root(self, root: &Hash) -> Result, Error> { + let mut conn = self.acquire().await?; + + let row = sqlx::query( + r#" + SELECT id + FROM identities + WHERE root = $1 + ORDER BY id ASC + LIMIT 1 + "#, + ) + .bind(root) + .fetch_optional(&mut *conn) + .await?; + + let Some(row) = row else { return Ok(None) }; + let root_id = row.get::(0); + + Ok(Some(root_id as usize)) + } + + async fn mark_all_as_pending(self) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + UPDATE identities + SET status = $1, mined_at = NULL + WHERE status <> $1 + "#, + ) + .bind(<&str>::from(ProcessedStatus::Pending)) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn get_next_leaf_index(self) -> Result { + let mut conn = self.acquire().await?; + + let row = sqlx::query( + r#" + SELECT leaf_index FROM identities + ORDER BY leaf_index DESC + LIMIT 1 + "#, + ) + .fetch_optional(&mut *conn) + .await?; + + let Some(row) = row else { return Ok(0) }; + let leaf_index = row.get::(0); + + Ok((leaf_index + 1) as usize) + } + + async fn get_identity_leaf_index(self, identity: &Hash) -> Result, Error> { + let mut conn = self.acquire().await?; + + let row = sqlx::query( + r#" + SELECT leaf_index, status + FROM identities + WHERE commitment = $1 + ORDER BY id DESC + LIMIT 1; + "#, + ) + .bind(identity) + .fetch_optional(&mut *conn) + .await?; + + let Some(row) = row else { + return Ok(None); + }; + + let leaf_index = row.get::(0) as usize; + + let status = row + .get::<&str, _>(1) + .parse() + .expect("Status is unreadable, database is corrupt"); + + Ok(Some(TreeItem { status, leaf_index })) + } + + async fn get_commitments_by_status( + self, + status: ProcessedStatus, + ) -> Result, Error> { + let mut conn = self.acquire().await?; + + Ok(sqlx::query_as::<_, TreeUpdate>( + r#" + SELECT leaf_index, commitment as element + FROM identities + WHERE status = $1 + ORDER BY id ASC; + "#, + ) + .bind(<&str>::from(status)) + .fetch_all(&mut *conn) + .await?) + } + + async fn get_commitments_by_statuses( + self, + statuses: Vec, + ) -> Result, Error> { + let mut conn = self.acquire().await?; + + let statuses: Vec<&str> = statuses.into_iter().map(<&str>::from).collect(); + Ok(sqlx::query_as::<_, TreeUpdate>( + r#" + SELECT leaf_index, commitment as element + FROM identities + WHERE status = ANY($1) + ORDER BY id ASC; + "#, + ) + .bind(&statuses[..]) // Official workaround https://github.com/launchbadge/sqlx/blob/main/FAQ.md#how-can-i-do-a-select--where-foo-in--query + .fetch_all(&mut *conn) + .await?) + } + + async fn get_non_zero_commitments_by_leaf_indexes( + self, + leaf_indexes: I, + ) -> Result, Error> + where + I: IntoIterator + Send, + { + let mut conn = self.acquire().await?; + + let leaf_indexes: Vec = leaf_indexes.into_iter().map(|v| v as i64).collect(); + + Ok(sqlx::query( + r#" + SELECT commitment + FROM identities + WHERE leaf_index = ANY($1) + AND commitment != $2 + "#, + ) + .bind(&leaf_indexes[..]) // Official workaround https://github.com/launchbadge/sqlx/blob/main/FAQ.md#how-can-i-do-a-select--where-foo-in--query + .bind(Hash::ZERO) + .fetch_all(&mut *conn) + .await? + .into_iter() + .map(|row| row.get::(0)) + .collect()) + } + + async fn get_latest_root_by_status( + self, + status: ProcessedStatus, + ) -> Result, Error> { + let mut conn = self.acquire().await?; + + Ok(sqlx::query( + r#" + SELECT root FROM identities WHERE status = $1 ORDER BY id DESC LIMIT 1 + "#, + ) + .bind(<&str>::from(status)) + .fetch_optional(&mut *conn) + .await? + .map(|r| r.get::(0))) + } + + async fn get_root_state(self, root: &Hash) -> Result, Error> { + let mut conn = self.acquire().await?; + + // This tries really hard to do everything in one query to prevent race + // conditions. + Ok(sqlx::query_as::<_, RootItem>( + r#" + SELECT + root, + status, + pending_as_of as pending_valid_as_of, + mined_at as mined_valid_as_of + FROM identities + WHERE root = $1 + ORDER BY id + LIMIT 1 + "#, + ) + .bind(root) + .fetch_optional(&mut *conn) + .await?) + } + + async fn get_latest_insertion(self) -> Result { + let mut conn = self.acquire().await?; + + let row = sqlx::query( + r#" + SELECT insertion_timestamp + FROM latest_insertion_timestamp + WHERE Lock = 'X';"#, + ) + .fetch_optional(&mut *conn) + .await?; + + if let Some(row) = row { + Ok(LatestInsertionEntry { + timestamp: row.get(0), + }) + } else { + Ok(LatestInsertionEntry { + timestamp: Utc::now(), + }) + } + } + + async fn count_unprocessed_identities(self) -> Result { + let mut conn = self.acquire().await?; + + let (count,): (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) as unprocessed + FROM unprocessed_identities + "#, + ) + .fetch_one(&mut *conn) + .await?; + + Ok(count as i32) + } + + async fn count_pending_identities(self) -> Result { + let mut conn = self.acquire().await?; + + let (count,): (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) as pending + FROM identities + WHERE status = $1 + "#, + ) + .bind(<&str>::from(ProcessedStatus::Pending)) + .fetch_one(&mut *conn) + .await?; + + Ok(count as i32) + } + + async fn get_provers(self) -> Result, Error> { + let mut conn = self.acquire().await?; + + Ok(sqlx::query_as( + r#" + SELECT batch_size, url, timeout_s, prover_type + FROM provers + "#, + ) + .fetch_all(&mut *conn) + .await? + .into_iter() + .collect()) + } + + async fn insert_prover_configuration( + self, + batch_size: usize, + url: impl ToString + Send, + timeout_seconds: u64, + prover_type: ProverType, + ) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + let url = url.to_string(); + + sqlx::query( + r#" + INSERT INTO provers (batch_size, url, timeout_s, prover_type) + VALUES ($1, $2, $3, $4) + "#, + ) + .bind(batch_size as i64) + .bind(url) + .bind(timeout_seconds as i64) + .bind(prover_type) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn insert_provers(self, provers: HashSet) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + if provers.is_empty() { + return Ok(()); + } + + let mut query_builder = sqlx::QueryBuilder::new( + r#" + INSERT INTO provers (batch_size, url, timeout_s, prover_type) + "#, + ); + + query_builder.push_values(provers, |mut b, prover| { + b.push_bind(prover.batch_size as i64) + .push_bind(prover.url) + .push_bind(prover.timeout_s as i64) + .push_bind(prover.prover_type); + }); + + let query = query_builder.build(); + + conn.execute(query).await?; + + Ok(()) + } + + async fn remove_prover(self, batch_size: usize, prover_type: ProverType) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + DELETE FROM provers WHERE batch_size = $1 AND prover_type = $2 + "#, + ) + .bind(batch_size as i64) + .bind(prover_type) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn insert_new_identity( + self, + identity: Hash, + eligibility_timestamp: sqlx::types::chrono::DateTime, + ) -> Result { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + INSERT INTO unprocessed_identities (commitment, status, created_at, eligibility) + VALUES ($1, $2, CURRENT_TIMESTAMP, $3) + "#, + ) + .bind(identity) + .bind(<&str>::from(UnprocessedStatus::New)) + .bind(eligibility_timestamp) + .execute(&mut *conn) + .await?; + + Ok(identity) + } + + async fn insert_new_recovery( + self, + existing_commitment: &Hash, + new_commitment: &Hash, + ) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + INSERT INTO recoveries (existing_commitment, new_commitment) + VALUES ($1, $2) + "#, + ) + .bind(existing_commitment) + .bind(new_commitment) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn get_latest_deletion(self) -> Result { + let mut conn = self.acquire().await?; + + let row = + sqlx::query("SELECT deletion_timestamp FROM latest_deletion_root WHERE Lock = 'X';") + .fetch_optional(&mut *conn) + .await?; + + if let Some(row) = row { + Ok(LatestDeletionEntry { + timestamp: row.get(0), + }) + } else { + Ok(LatestDeletionEntry { + timestamp: Utc::now(), + }) + } + } + + async fn update_latest_insertion( + self, + insertion_timestamp: DateTime, + ) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + INSERT INTO latest_insertion_timestamp (Lock, insertion_timestamp) + VALUES ('X', $1) + ON CONFLICT (Lock) + DO UPDATE SET insertion_timestamp = EXCLUDED.insertion_timestamp; + "#, + ) + .bind(insertion_timestamp) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn update_latest_deletion(self, deletion_timestamp: DateTime) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + INSERT INTO latest_deletion_root (Lock, deletion_timestamp) + VALUES ('X', $1) + ON CONFLICT (Lock) + DO UPDATE SET deletion_timestamp = EXCLUDED.deletion_timestamp; + "#, + ) + .bind(deletion_timestamp) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + #[cfg(test)] + async fn get_all_recoveries(self) -> Result, Error> { + let mut conn = self.acquire().await?; + + Ok( + sqlx::query_as::<_, RecoveryEntry>("SELECT * FROM recoveries") + .fetch_all(&mut *conn) + .await?, + ) + } + + async fn delete_recoveries(self, prev_commits: I) -> Result, Error> + where + I: IntoIterator + Send, + T: Into, + { + let mut conn = self.acquire().await?; + + // TODO: upstream PgHasArrayType impl to ruint + let prev_commits = prev_commits + .into_iter() + .map(|c| c.into().to_be_bytes()) + .collect::>(); + + let res = sqlx::query_as::<_, RecoveryEntry>( + r#" + DELETE + FROM recoveries + WHERE existing_commitment = ANY($1) + RETURNING * + "#, + ) + .bind(&prev_commits) + .fetch_all(&mut *conn) + .await?; + + Ok(res) + } + + async fn insert_new_deletion(self, leaf_index: usize, identity: &Hash) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + INSERT INTO deletions (leaf_index, commitment) + VALUES ($1, $2) + "#, + ) + .bind(leaf_index as i64) + .bind(identity) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + // TODO: consider using a larger value than i64 for leaf index, ruint should + // have postgres compatibility for u256 + async fn get_deletions(self) -> Result, Error> { + let mut conn = self.acquire().await?; + + let result = sqlx::query( + r#" + SELECT * + FROM deletions + "#, + ) + .fetch_all(&mut *conn) + .await?; + + Ok(result + .into_iter() + .map(|row| DeletionEntry { + leaf_index: row.get::(0) as usize, + commitment: row.get::(1), + }) + .collect::>()) + } + + /// Remove a list of entries from the deletions table + async fn remove_deletions(self, commitments: &[Hash]) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + let commitments = commitments + .iter() + .map(|c| c.to_be_bytes()) + .collect::>(); + + sqlx::query("DELETE FROM deletions WHERE commitment = Any($1)") + .bind(commitments) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn get_eligible_unprocessed_commitments( + self, + status: UnprocessedStatus, + ) -> Result, Error> { + let mut conn = self.acquire().await?; + + let result = sqlx::query( + r#" + SELECT * FROM unprocessed_identities + WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility + LIMIT $2 + "#, + ) + .bind(<&str>::from(status)) + .bind(MAX_UNPROCESSED_FETCH_COUNT) + .fetch_all(&mut *conn) + .await?; + + Ok(result + .into_iter() + .map(|row| types::UnprocessedCommitment { + commitment: row.get::(0), + status, + created_at: row.get::<_, _>(2), + processed_at: row.get::<_, _>(3), + error_message: row.get::<_, _>(4), + eligibility_timestamp: row.get::<_, _>(5), + }) + .collect::>()) + } + + async fn get_unprocessed_error(self, commitment: &Hash) -> Result, Error> { + let mut conn = self.acquire().await?; + + let result = sqlx::query( + r#" + SELECT error_message FROM unprocessed_identities WHERE commitment = $1 + "#, + ) + .bind(commitment) + .fetch_optional(&mut *conn) + .await?; + + if let Some(row) = result { + return Ok(Some(row.get::, _>(0).unwrap_or_default())); + }; + + Ok(None) + } + + async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + DELETE FROM unprocessed_identities WHERE commitment = $1 + "#, + ) + .bind(commitment) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn identity_exists(self, commitment: Hash) -> Result { + let mut conn = self.acquire().await?; + + Ok(sqlx::query( + r#" + select + EXISTS (select commitment from unprocessed_identities where commitment = $1) OR + EXISTS (select commitment from identities where commitment = $1); + "#, + ) + .bind(commitment) + .fetch_one(&mut *conn) + .await? + .get::(0)) + } + + // TODO: add docs + async fn identity_is_queued_for_deletion(self, commitment: &Hash) -> Result { + let mut conn = self.acquire().await?; + + let row_unprocessed = + sqlx::query(r#"SELECT exists(SELECT 1 FROM deletions where commitment = $1)"#) + .bind(commitment) + .fetch_one(&mut *conn) + .await?; + + Ok(row_unprocessed.get::(0)) + } + + async fn insert_new_batch_head(self, next_root: &Hash) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + INSERT INTO batches( + id, + next_root, + prev_root, + created_at, + batch_type, + data + ) VALUES (DEFAULT, $1, NULL, CURRENT_TIMESTAMP, $2, $3) + "#, + ) + .bind(next_root) + .bind(BatchType::Insertion) + .bind(sqlx::types::Json::from(BatchEntryData { + identities: vec![], + indexes: vec![], + })) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn insert_new_batch( + self, + next_root: &Hash, + prev_root: &Hash, + batch_type: BatchType, + identities: &[Identity], + indexes: &[usize], + ) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + INSERT INTO batches( + id, + next_root, + prev_root, + created_at, + batch_type, + data + ) VALUES (DEFAULT, $1, $2, CURRENT_TIMESTAMP, $3, $4) + "#, + ) + .bind(next_root) + .bind(prev_root) + .bind(batch_type) + .bind(sqlx::types::Json::from(BatchEntryData { + identities: identities.to_vec(), + indexes: indexes.to_vec(), + })) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + #[cfg(test)] + async fn get_next_batch(self, prev_root: &Hash) -> Result, Error> { + let mut conn = self.acquire().await?; + + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches WHERE prev_root = $1 + LIMIT 1 + "#, + ) + .bind(prev_root) + .fetch_optional(&mut *conn) + .await?; + + Ok(res) + } + + async fn get_latest_batch(self) -> Result, Error> { + let mut conn = self.acquire().await?; + + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches + ORDER BY id DESC + LIMIT 1 + "#, + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(res) + } + + async fn get_next_batch_without_transaction(self) -> Result, Error> { + let mut conn = self.acquire().await?; + + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + batches.id, + batches.next_root, + batches.prev_root, + batches.created_at, + batches.batch_type, + batches.data + FROM batches + LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root + WHERE transactions.batch_next_root IS NULL AND batches.prev_root IS NOT NULL + ORDER BY batches.id ASC + LIMIT 1 + "#, + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(res) + } + + async fn get_batch_head(self) -> Result, Error> { + let mut conn = self.acquire().await?; + + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches WHERE prev_root IS NULL + LIMIT 1 + "#, + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(res) + } + + #[instrument(skip(self), level = "debug")] + async fn delete_batches_after_root(self, root: &Hash) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + DELETE FROM batches + WHERE prev_root = $1 + "#, + ) + .bind(root) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + #[instrument(skip(self), level = "debug")] + async fn delete_all_batches(self) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + DELETE FROM batches + "#, + ) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn insert_new_transaction( + self, + transaction_id: &String, + batch_next_root: &Hash, + ) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + INSERT INTO transactions( + transaction_id, + batch_next_root, + created_at + ) VALUES ($1, $2, CURRENT_TIMESTAMP) + "#, + ) + .bind(transaction_id) + .bind(batch_next_root) + .execute(&mut *conn) + .await?; + + Ok(()) + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs index cce028c0..88c1c33f 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -21,6 +21,7 @@ use crate::identity_tree::Hash; pub mod query; pub mod transaction; pub mod types; +pub mod methods; // Statically link in migration files static MIGRATOR: Migrator = sqlx::migrate!("schemas/database"); diff --git a/src/database/query.rs b/src/database/query.rs index 4ec9f502..3b24caa7 100644 --- a/src/database/query.rs +++ b/src/database/query.rs @@ -277,6 +277,7 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { .into_iter() .collect()) } + async fn insert_prover_configuration( self, batch_size: usize, From 24c311b2d264dc715ae0fbd73a811db60b93368e Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Tue, 15 Oct 2024 15:42:59 +0200 Subject: [PATCH 2/7] transactions --- src/database/methods.rs | 79 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/src/database/methods.rs b/src/database/methods.rs index 9469f124..132c164c 100644 --- a/src/database/methods.rs +++ b/src/database/methods.rs @@ -68,15 +68,83 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(Some(root_id as usize)) } + async fn mark_root_as_processed(self, root: &Hash) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + let root_id = conn.get_id_by_root(root).await?; + + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; + + let root_id = root_id as i64; + + sqlx::query( + r#" + UPDATE identities + SET status = $2, mined_at = CURRENT_TIMESTAMP + WHERE id <= $1 + AND status <> $2 + AND status <> $3; + "#, + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Processed)) + .bind(<&str>::from(ProcessedStatus::Mined)) + .execute(&mut *conn) + .await?; + + sqlx::query( + r#" + UPDATE identities + SET status = $2, mined_at = NULL + WHERE id > $1 + "#, + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Pending)) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + async fn mark_root_as_mined(self, root: &Hash) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + let root_id = conn.get_id_by_root(root).await?; + + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; + + let root_id = root_id as i64; + + sqlx::query( + r#" + UPDATE identities + SET status = $2 + WHERE id <= $1 + AND status <> $2 + "#, + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Mined)) + .execute(&mut *conn) + .await?; + + Ok(()) + } + async fn mark_all_as_pending(self) -> Result<(), Error> { let mut conn = self.acquire().await?; sqlx::query( r#" - UPDATE identities - SET status = $1, mined_at = NULL - WHERE status <> $1 - "#, + UPDATE identities + SET status = $1, mined_at = NULL + WHERE status <> $1 + "#, ) .bind(<&str>::from(ProcessedStatus::Pending)) .execute(&mut *conn) @@ -884,3 +952,6 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } } + +// Blanket implementation for all types that satisfy the trait bounds +impl<'c, T> DbMethods<'c> for T where T: Acquire<'c, Database = Postgres> + Send + Sync + Sized {} From 5180347decc036ce9920c377b15165ba1a410298 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Tue, 15 Oct 2024 15:54:45 +0200 Subject: [PATCH 3/7] wip --- src/app.rs | 3 +- src/database/methods.rs | 6 ++++ src/database/mod.rs | 37 +++++++++++---------- src/identity/processor.rs | 2 +- src/identity_tree/initializer.rs | 3 +- src/task_monitor/mod.rs | 2 +- src/task_monitor/tasks/create_batches.rs | 2 +- src/task_monitor/tasks/delete_identities.rs | 2 +- src/task_monitor/tasks/insert_identities.rs | 2 +- src/task_monitor/tasks/process_batches.rs | 2 +- 10 files changed, 36 insertions(+), 25 deletions(-) diff --git a/src/app.rs b/src/app.rs index e2cd9b87..07276c8b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -9,7 +9,8 @@ use tracing::{info, instrument, warn}; use crate::config::Config; use crate::contracts::IdentityManager; -use crate::database::query::DatabaseQuery as _; +// use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::database::Database; use crate::ethereum::Ethereum; use crate::identity::processor::{ diff --git a/src/database/methods.rs b/src/database/methods.rs index 132c164c..824bcca3 100644 --- a/src/database/methods.rs +++ b/src/database/methods.rs @@ -68,6 +68,9 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(Some(root_id as usize)) } + /// Marks a root and associated entities as processed + /// + /// This is a composite operation performing multiple queries - it should be ran within a transaction. async fn mark_root_as_processed(self, root: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -109,6 +112,9 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + /// Marks a root and associated identities as mined + /// + /// This is a composite operation performing multiple queries - it should be ran within a transaction. async fn mark_root_as_mined(self, root: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; diff --git a/src/database/mod.rs b/src/database/mod.rs index 88c1c33f..039509aa 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -15,11 +15,10 @@ use thiserror::Error; use tracing::{error, info, instrument, warn}; use crate::config::DatabaseConfig; -use crate::database::query::DatabaseQuery; use crate::identity_tree::Hash; -pub mod query; -pub mod transaction; +// pub mod query; +// pub mod transaction; pub mod types; pub mod methods; @@ -38,7 +37,7 @@ impl Deref for Database { } } -impl<'a, T> DatabaseQuery<'a> for T where T: Executor<'a, Database = Postgres> {} +// impl<'a, T> DatabaseQuery<'a> for T where T: Executor<'a, Database = Postgres> {} impl Database { #[instrument(skip_all)] @@ -165,7 +164,7 @@ mod test { use super::Database; use crate::config::DatabaseConfig; - use crate::database::query::DatabaseQuery; + use crate::database::methods::DbMethods; use crate::database::types::BatchType; use crate::identity_tree::{Hash, ProcessedStatus, UnprocessedStatus}; use crate::prover::identity::Identity; @@ -659,7 +658,9 @@ mod test { pre_root = &roots[i]; } - db.mark_root_as_processed_tx(&roots[2]).await?; + let mut tx = db.begin().await?; + tx.mark_root_as_processed(&roots[2]).await?; + tx.commit().await?; db.mark_all_as_pending().await?; @@ -692,7 +693,9 @@ mod test { pre_root = &roots[i]; } - db.mark_root_as_processed_tx(&roots[2]).await?; + let mut tx = db.begin().await?; + tx.mark_root_as_processed(&roots[2]).await?; + tx.commit().await?; for root in roots.iter().take(3) { let root = db @@ -738,7 +741,7 @@ mod test { pre_root = &roots[i]; } - db.mark_root_as_mined_tx(&roots[2]).await?; + db.mark_root_as_mined(&roots[2]).await?; for root in roots.iter().take(3) { let root = db @@ -787,27 +790,27 @@ mod test { } println!("Marking roots up to 2nd as processed"); - db.mark_root_as_processed_tx(&roots[2]).await?; + db.mark_root_as_processed(&roots[2]).await?; assert_roots_are(&db, &roots[..3], ProcessedStatus::Processed).await?; assert_roots_are(&db, &roots[3..], ProcessedStatus::Pending).await?; println!("Marking roots up to 1st as mined"); - db.mark_root_as_mined_tx(&roots[1]).await?; + db.mark_root_as_mined(&roots[1]).await?; assert_roots_are(&db, &roots[..2], ProcessedStatus::Mined).await?; assert_roots_are(&db, &[roots[2]], ProcessedStatus::Processed).await?; assert_roots_are(&db, &roots[3..], ProcessedStatus::Pending).await?; println!("Marking roots up to 4th as processed"); - db.mark_root_as_processed_tx(&roots[4]).await?; + db.mark_root_as_processed(&roots[4]).await?; assert_roots_are(&db, &roots[..2], ProcessedStatus::Mined).await?; assert_roots_are(&db, &roots[2..5], ProcessedStatus::Processed).await?; assert_roots_are(&db, &roots[5..], ProcessedStatus::Pending).await?; println!("Marking all roots as mined"); - db.mark_root_as_mined_tx(&roots[num_identities - 1]).await?; + db.mark_root_as_mined(&roots[num_identities - 1]).await?; assert_roots_are(&db, &roots, ProcessedStatus::Mined).await?; @@ -832,10 +835,10 @@ mod test { } // root[2] is somehow erroneously marked as mined - db.mark_root_as_processed_tx(&roots[2]).await?; + db.mark_root_as_processed(&roots[2]).await?; // Later we correctly mark the previous root as mined - db.mark_root_as_processed_tx(&roots[1]).await?; + db.mark_root_as_processed(&roots[1]).await?; for root in roots.iter().take(2) { let root = db @@ -894,7 +897,7 @@ mod test { "Root has not yet been mined" ); - db.mark_root_as_processed_tx(&roots[0]).await?; + db.mark_root_as_processed(&roots[0]).await?; let root = db .get_root_state(&roots[0]) @@ -938,7 +941,7 @@ mod test { pre_root = &roots[i]; } - db.mark_root_as_processed_tx(&roots[2]).await?; + db.mark_root_as_processed(&roots[2]).await?; let mined_tree_updates = db .get_commitments_by_status(ProcessedStatus::Processed) @@ -1071,7 +1074,7 @@ mod test { db.insert_pending_identity(3, &identities[3], &roots[3], &roots[2]) .await?; - db.mark_root_as_processed_tx(&roots[0]) + db.mark_root_as_processed(&roots[0]) .await .context("Marking root as mined")?; diff --git a/src/identity/processor.rs b/src/identity/processor.rs index 41c3e1e5..2925da04 100644 --- a/src/identity/processor.rs +++ b/src/identity/processor.rs @@ -16,7 +16,7 @@ use crate::config::Config; use crate::contracts::abi::{BridgedWorldId, RootAddedFilter, TreeChangeKind, TreeChangedFilter}; use crate::contracts::scanner::BlockScanner; use crate::contracts::IdentityManager; -use crate::database::query::DatabaseQuery; +use crate::database::methods::DbMethods; use crate::database::types::{BatchEntry, BatchType}; use crate::database::{Database, Error}; use crate::ethereum::{Ethereum, ReadProvider}; diff --git a/src/identity_tree/initializer.rs b/src/identity_tree/initializer.rs index 772dc268..343fbb55 100644 --- a/src/identity_tree/initializer.rs +++ b/src/identity_tree/initializer.rs @@ -5,7 +5,8 @@ use semaphore::poseidon_tree::LazyPoseidonTree; use tracing::{info, instrument, warn}; use crate::config::TreeConfig; -use crate::database::query::DatabaseQuery; +// use crate::database::query::DatabaseQuery; +use crate::database::methods::DbMethods; use crate::database::Database; use crate::identity::processor::IdentityProcessor; use crate::identity_tree::{ diff --git a/src/task_monitor/mod.rs b/src/task_monitor/mod.rs index 6318c21e..de0cc358 100644 --- a/src/task_monitor/mod.rs +++ b/src/task_monitor/mod.rs @@ -8,7 +8,7 @@ use tokio::task::JoinHandle; use tracing::{info, instrument, warn}; use crate::app::App; -use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::database::Database; use crate::shutdown::Shutdown; diff --git a/src/task_monitor/tasks/create_batches.rs b/src/task_monitor/tasks/create_batches.rs index 4f16e36f..4674f2b6 100644 --- a/src/task_monitor/tasks/create_batches.rs +++ b/src/task_monitor/tasks/create_batches.rs @@ -12,7 +12,7 @@ use tracing::instrument; use crate::app::App; use crate::database; -use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::database::Database; use crate::identity_tree::{ AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index 35aca551..7db83a75 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -9,7 +9,7 @@ use tokio::time; use tracing::info; use crate::app::App; -use crate::database::query::DatabaseQuery; +use crate::database::methods::DbMethods; use crate::database::types::DeletionEntry; use crate::identity_tree::{Hash, TreeVersionReadOps}; diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index bbd680a7..fafa275e 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -7,7 +7,7 @@ use tokio::time; use tracing::info; use crate::app::App; -use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::database::types::UnprocessedCommitment; use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; use crate::retry_tx; diff --git a/src/task_monitor/tasks/process_batches.rs b/src/task_monitor/tasks/process_batches.rs index 95fb6055..74efffe3 100644 --- a/src/task_monitor/tasks/process_batches.rs +++ b/src/task_monitor/tasks/process_batches.rs @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, Notify}; use tokio::{select, time}; use crate::app::App; -use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::identity::processor::TransactionId; pub async fn process_batches( From 4a20f065316f9eeecd28f5f92977326373893664 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Wed, 16 Oct 2024 16:34:52 +0200 Subject: [PATCH 4/7] Use txs --- src/identity/processor.rs | 59 ++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/src/identity/processor.rs b/src/identity/processor.rs index 2925da04..21618db8 100644 --- a/src/identity/processor.rs +++ b/src/identity/processor.rs @@ -46,14 +46,14 @@ pub trait IdentityProcessor: Send + Sync + 'static { } pub struct OnChainIdentityProcessor { - ethereum: Ethereum, - config: Config, - database: Arc, - identity_manager: Arc, + ethereum: Ethereum, + config: Config, + database: Arc, + identity_manager: Arc, prover_repository: Arc, - mainnet_scanner: tokio::sync::Mutex>>, - mainnet_address: Address, + mainnet_scanner: tokio::sync::Mutex>>, + mainnet_address: Address, secondary_scanners: tokio::sync::Mutex>>>, } @@ -141,15 +141,20 @@ impl IdentityProcessor for OnChainIdentityProcessor { // Note that we don't have a way of queuing a root here for // finalization. so it's going to stay as "processed" // until the next root is mined. self.database. - self.database - .mark_root_as_processed_and_delete_batches_tx(&root_hash) - .await?; + retry_tx!(self.database.pool, tx, { + tx.mark_root_as_processed(&root_hash).await?; + tx.delete_batches_after_root(&root_hash).await?; + + Result::<(), Error>::Ok(()) + }) + .await?; } else { // Db is either empty or we're restarting with a new contract/chain // so we should mark everything as pending retry_tx!(self.database.pool, tx, { tx.mark_all_as_pending().await?; tx.delete_all_batches().await?; + Result::<(), Error>::Ok(()) }) .await?; @@ -420,9 +425,12 @@ impl OnChainIdentityProcessor { continue; } - self.database - .mark_root_as_processed_tx(&post_root.into()) - .await?; + retry_tx!( + self.database.pool, + tx, + tx.mark_root_as_processed(&post_root.into()).await + ) + .await?; info!(?pre_root, ?post_root, ?kind, "Batch mined"); @@ -456,7 +464,12 @@ impl OnChainIdentityProcessor { continue; } - self.database.mark_root_as_mined_tx(&root.into()).await?; + retry_tx!( + self.database.pool, + tx, + tx.mark_root_as_mined(&root.into()).await + ) + .await?; info!(?root, "Root finalized"); } @@ -549,7 +562,7 @@ impl OnChainIdentityProcessor { pub struct OffChainIdentityProcessor { committed_batches: Arc>>, - database: Arc, + database: Arc, } #[async_trait] @@ -575,14 +588,16 @@ impl IdentityProcessor for OffChainIdentityProcessor { self.update_eligible_recoveries(batch).await?; } - // With current flow it is required to mark root as processed first as this is - // how required mined_at field is set - self.database - .mark_root_as_processed_tx(&batch.next_root) - .await?; - self.database - .mark_root_as_mined_tx(&batch.next_root) - .await?; + retry_tx!(self.database.pool, tx, { + // With current flow it is required to mark root as processed first as this is + // how required mined_at field is set + tx.mark_root_as_processed(&batch.next_root).await?; + tx.mark_root_as_mined(&batch.next_root).await?; + + Result::<_, anyhow::Error>::Ok(()) + }) + .await?; + processed_tree.apply_updates_up_to(batch.next_root); } From a78580f96990e5e4c1ee11814120e6ed70353b25 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Wed, 16 Oct 2024 16:40:50 +0200 Subject: [PATCH 5/7] Remove unused --- src/database/methods.rs | 23 +- src/database/mod.rs | 4 - src/database/query.rs | 790 ------------------------------------ src/database/transaction.rs | 110 ----- 4 files changed, 15 insertions(+), 912 deletions(-) delete mode 100644 src/database/query.rs delete mode 100644 src/database/transaction.rs diff --git a/src/database/methods.rs b/src/database/methods.rs index 824bcca3..d8f9b5e4 100644 --- a/src/database/methods.rs +++ b/src/database/methods.rs @@ -683,23 +683,30 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { .collect::>()) } - async fn get_unprocessed_error(self, commitment: &Hash) -> Result, Error> { + /// Returns the error message from the unprocessed identities table + /// if it exists + /// + /// - The outer option represents the existence of the commitment in the + /// unprocessed_identities table + /// - The inner option represents the existence of an error message + async fn get_unprocessed_error( + self, + commitment: &Hash, + ) -> Result>, Error> { let mut conn = self.acquire().await?; - let result = sqlx::query( + let result: Option<(Option,)> = sqlx::query_as( r#" - SELECT error_message FROM unprocessed_identities WHERE commitment = $1 + SELECT error_message + FROM unprocessed_identities + WHERE commitment = $1 "#, ) .bind(commitment) .fetch_optional(&mut *conn) .await?; - if let Some(row) = result { - return Ok(Some(row.get::, _>(0).unwrap_or_default())); - }; - - Ok(None) + Ok(result.map(|(error_message,)| error_message)) } async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { diff --git a/src/database/mod.rs b/src/database/mod.rs index 039509aa..f91e896d 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -17,8 +17,6 @@ use tracing::{error, info, instrument, warn}; use crate::config::DatabaseConfig; use crate::identity_tree::Hash; -// pub mod query; -// pub mod transaction; pub mod types; pub mod methods; @@ -37,8 +35,6 @@ impl Deref for Database { } } -// impl<'a, T> DatabaseQuery<'a> for T where T: Executor<'a, Database = Postgres> {} - impl Database { #[instrument(skip_all)] pub async fn new(config: &DatabaseConfig) -> Result { diff --git a/src/database/query.rs b/src/database/query.rs deleted file mode 100644 index 3b24caa7..00000000 --- a/src/database/query.rs +++ /dev/null @@ -1,790 +0,0 @@ -use std::collections::HashSet; - -use chrono::{DateTime, Utc}; -use ruint::aliases::U256; -use sqlx::{Executor, Postgres, Row}; -use tracing::instrument; -use types::{DeletionEntry, LatestDeletionEntry, RecoveryEntry}; - -use crate::database::types::{BatchEntry, BatchEntryData, BatchType, LatestInsertionEntry}; -use crate::database::{types, Error}; -use crate::identity_tree::{ - Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, -}; -use crate::prover::identity::Identity; -use crate::prover::{ProverConfig, ProverType}; - -const MAX_UNPROCESSED_FETCH_COUNT: i64 = 10_000; - -/// This trait provides the individual and composable queries to the database. -/// Each method is a single atomic query, and can be composed within a -/// transaction. -pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { - async fn insert_pending_identity( - self, - leaf_index: usize, - identity: &Hash, - root: &Hash, - pre_root: &Hash, - ) -> Result<(), Error> { - let insert_pending_identity_query = sqlx::query( - r#" - INSERT INTO identities (leaf_index, commitment, root, status, pending_as_of, pre_root) - VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP, $5) - "#, - ) - .bind(leaf_index as i64) - .bind(identity) - .bind(root) - .bind(<&str>::from(ProcessedStatus::Pending)) - .bind(pre_root); - - self.execute(insert_pending_identity_query).await?; - - Ok(()) - } - - async fn get_id_by_root(self, root: &Hash) -> Result, Error> { - let root_index_query = sqlx::query( - r#" - SELECT id - FROM identities - WHERE root = $1 - ORDER BY id ASC - LIMIT 1 - "#, - ) - .bind(root); - - let row = self.fetch_optional(root_index_query).await?; - - let Some(row) = row else { return Ok(None) }; - let root_id = row.get::(0); - - Ok(Some(root_id as usize)) - } - - /// Marks all the identities in the db as - #[instrument(skip(self), level = "debug")] - async fn mark_all_as_pending(self) -> Result<(), Error> { - let pending_status = ProcessedStatus::Pending; - - let update_all_identities = sqlx::query( - r#" - UPDATE identities - SET status = $1, mined_at = NULL - WHERE status <> $1 - "#, - ) - .bind(<&str>::from(pending_status)); - - self.execute(update_all_identities).await?; - - Ok(()) - } - - async fn get_next_leaf_index(self) -> Result { - let query = sqlx::query( - r#" - SELECT leaf_index FROM identities - ORDER BY leaf_index DESC - LIMIT 1 - "#, - ); - - let row = self.fetch_optional(query).await?; - - let Some(row) = row else { return Ok(0) }; - let leaf_index = row.get::(0); - - Ok((leaf_index + 1) as usize) - } - - async fn get_identity_leaf_index(self, identity: &Hash) -> Result, Error> { - let query = sqlx::query( - r#" - SELECT leaf_index, status - FROM identities - WHERE commitment = $1 - ORDER BY id DESC - LIMIT 1; - "#, - ) - .bind(identity); - - let Some(row) = self.fetch_optional(query).await? else { - return Ok(None); - }; - - let leaf_index = row.get::(0) as usize; - - let status = row - .get::<&str, _>(1) - .parse() - .expect("Status is unreadable, database is corrupt"); - - Ok(Some(TreeItem { status, leaf_index })) - } - - async fn get_commitments_by_status( - self, - status: ProcessedStatus, - ) -> Result, Error> { - Ok(sqlx::query_as::<_, TreeUpdate>( - r#" - SELECT leaf_index, commitment as element - FROM identities - WHERE status = $1 - ORDER BY id ASC; - "#, - ) - .bind(<&str>::from(status)) - .fetch_all(self) - .await?) - } - - async fn get_commitments_by_statuses( - self, - statuses: Vec, - ) -> Result, Error> { - let statuses: Vec<&str> = statuses.into_iter().map(<&str>::from).collect(); - Ok(sqlx::query_as::<_, TreeUpdate>( - r#" - SELECT leaf_index, commitment as element - FROM identities - WHERE status = ANY($1) - ORDER BY id ASC; - "#, - ) - .bind(&statuses[..]) // Official workaround https://github.com/launchbadge/sqlx/blob/main/FAQ.md#how-can-i-do-a-select--where-foo-in--query - .fetch_all(self) - .await?) - } - - async fn get_non_zero_commitments_by_leaf_indexes>( - self, - leaf_indexes: I, - ) -> Result, Error> { - let leaf_indexes: Vec = leaf_indexes.into_iter().map(|v| v as i64).collect(); - - Ok(sqlx::query( - r#" - SELECT commitment - FROM identities - WHERE leaf_index = ANY($1) - AND commitment != $2 - "#, - ) - .bind(&leaf_indexes[..]) // Official workaround https://github.com/launchbadge/sqlx/blob/main/FAQ.md#how-can-i-do-a-select--where-foo-in--query - .bind(Hash::ZERO) - .fetch_all(self) - .await? - .into_iter() - .map(|row| row.get::(0)) - .collect()) - } - - async fn get_latest_root_by_status( - self, - status: ProcessedStatus, - ) -> Result, Error> { - Ok(sqlx::query( - r#" - SELECT root FROM identities WHERE status = $1 ORDER BY id DESC LIMIT 1 - "#, - ) - .bind(<&str>::from(status)) - .fetch_optional(self) - .await? - .map(|r| r.get::(0))) - } - - async fn get_root_state(self, root: &Hash) -> Result, Error> { - // This tries really hard to do everything in one query to prevent race - // conditions. - Ok(sqlx::query_as::<_, RootItem>( - r#" - SELECT - root, - status, - pending_as_of as pending_valid_as_of, - mined_at as mined_valid_as_of - FROM identities - WHERE root = $1 - ORDER BY id - LIMIT 1 - "#, - ) - .bind(root) - .fetch_optional(self) - .await?) - } - - async fn get_latest_insertion(self) -> Result { - let query = sqlx::query( - r#" - SELECT insertion_timestamp - FROM latest_insertion_timestamp - WHERE Lock = 'X';"#, - ); - - let row = self.fetch_optional(query).await?; - - if let Some(row) = row { - Ok(LatestInsertionEntry { - timestamp: row.get(0), - }) - } else { - Ok(LatestInsertionEntry { - timestamp: Utc::now(), - }) - } - } - - async fn count_unprocessed_identities(self) -> Result { - let query = sqlx::query( - r#" - SELECT COUNT(*) as unprocessed - FROM unprocessed_identities - "#, - ); - let result = self.fetch_one(query).await?; - Ok(result.get::(0) as i32) - } - - async fn count_pending_identities(self) -> Result { - let query = sqlx::query( - r#" - SELECT COUNT(*) as pending - FROM identities - WHERE status = $1 - "#, - ) - .bind(<&str>::from(ProcessedStatus::Pending)); - let result = self.fetch_one(query).await?; - Ok(result.get::(0) as i32) - } - - async fn get_provers(self) -> Result, Error> { - Ok(sqlx::query_as( - r#" - SELECT batch_size, url, timeout_s, prover_type - FROM provers - "#, - ) - .fetch_all(self) - .await? - .into_iter() - .collect()) - } - - async fn insert_prover_configuration( - self, - batch_size: usize, - url: impl ToString, - timeout_seconds: u64, - prover_type: ProverType, - ) -> Result<(), Error> { - let url = url.to_string(); - - let query = sqlx::query( - r#" - INSERT INTO provers (batch_size, url, timeout_s, prover_type) - VALUES ($1, $2, $3, $4) - "#, - ) - .bind(batch_size as i64) - .bind(url) - .bind(timeout_seconds as i64) - .bind(prover_type); - - self.execute(query).await?; - - Ok(()) - } - - async fn insert_provers(self, provers: HashSet) -> Result<(), Error> { - if provers.is_empty() { - return Ok(()); - } - - let mut query_builder = sqlx::QueryBuilder::new( - r#" - INSERT INTO provers (batch_size, url, timeout_s, prover_type) - "#, - ); - - query_builder.push_values(provers, |mut b, prover| { - b.push_bind(prover.batch_size as i64) - .push_bind(prover.url) - .push_bind(prover.timeout_s as i64) - .push_bind(prover.prover_type); - }); - - let query = query_builder.build(); - - self.execute(query).await?; - Ok(()) - } - - async fn remove_prover(self, batch_size: usize, prover_type: ProverType) -> Result<(), Error> { - let query = sqlx::query( - r#" - DELETE FROM provers WHERE batch_size = $1 AND prover_type = $2 - "#, - ) - .bind(batch_size as i64) - .bind(prover_type); - - self.execute(query).await?; - - Ok(()) - } - - async fn insert_new_identity( - self, - identity: Hash, - eligibility_timestamp: sqlx::types::chrono::DateTime, - ) -> Result { - let query = sqlx::query( - r#" - INSERT INTO unprocessed_identities (commitment, status, created_at, eligibility) - VALUES ($1, $2, CURRENT_TIMESTAMP, $3) - "#, - ) - .bind(identity) - .bind(<&str>::from(UnprocessedStatus::New)) - .bind(eligibility_timestamp); - - self.execute(query).await?; - Ok(identity) - } - - async fn insert_new_recovery( - self, - existing_commitment: &Hash, - new_commitment: &Hash, - ) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO recoveries (existing_commitment, new_commitment) - VALUES ($1, $2) - "#, - ) - .bind(existing_commitment) - .bind(new_commitment); - self.execute(query).await?; - Ok(()) - } - - async fn get_latest_deletion(self) -> Result { - let query = - sqlx::query("SELECT deletion_timestamp FROM latest_deletion_root WHERE Lock = 'X';"); - - let row = self.fetch_optional(query).await?; - - if let Some(row) = row { - Ok(LatestDeletionEntry { - timestamp: row.get(0), - }) - } else { - Ok(LatestDeletionEntry { - timestamp: Utc::now(), - }) - } - } - - async fn update_latest_insertion( - self, - insertion_timestamp: DateTime, - ) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO latest_insertion_timestamp (Lock, insertion_timestamp) - VALUES ('X', $1) - ON CONFLICT (Lock) - DO UPDATE SET insertion_timestamp = EXCLUDED.insertion_timestamp; - "#, - ) - .bind(insertion_timestamp); - - self.execute(query).await?; - Ok(()) - } - - async fn update_latest_deletion(self, deletion_timestamp: DateTime) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO latest_deletion_root (Lock, deletion_timestamp) - VALUES ('X', $1) - ON CONFLICT (Lock) - DO UPDATE SET deletion_timestamp = EXCLUDED.deletion_timestamp; - "#, - ) - .bind(deletion_timestamp); - - self.execute(query).await?; - Ok(()) - } - - #[cfg(test)] - async fn get_all_recoveries(self) -> Result, Error> { - Ok( - sqlx::query_as::<_, RecoveryEntry>("SELECT * FROM recoveries") - .fetch_all(self) - .await?, - ) - } - - async fn delete_recoveries, T: Into>( - self, - prev_commits: I, - ) -> Result, Error> { - // TODO: upstream PgHasArrayType impl to ruint - let prev_commits = prev_commits - .into_iter() - .map(|c| c.into().to_be_bytes()) - .collect::>(); - - let res = sqlx::query_as::<_, RecoveryEntry>( - r#" - DELETE - FROM recoveries - WHERE existing_commitment = ANY($1) - RETURNING * - "#, - ) - .bind(&prev_commits) - .fetch_all(self) - .await?; - - Ok(res) - } - - async fn insert_new_deletion(self, leaf_index: usize, identity: &Hash) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO deletions (leaf_index, commitment) - VALUES ($1, $2) - "#, - ) - .bind(leaf_index as i64) - .bind(identity); - - self.execute(query).await?; - Ok(()) - } - - // TODO: consider using a larger value than i64 for leaf index, ruint should - // have postgres compatibility for u256 - async fn get_deletions(self) -> Result, Error> { - let query = sqlx::query( - r#" - SELECT * - FROM deletions - "#, - ); - - let result = self.fetch_all(query).await?; - - Ok(result - .into_iter() - .map(|row| DeletionEntry { - leaf_index: row.get::(0) as usize, - commitment: row.get::(1), - }) - .collect::>()) - } - - /// Remove a list of entries from the deletions table - async fn remove_deletions(self, commitments: &[Hash]) -> Result<(), Error> { - let commitments = commitments - .iter() - .map(|c| c.to_be_bytes()) - .collect::>(); - - sqlx::query("DELETE FROM deletions WHERE commitment = Any($1)") - .bind(commitments) - .execute(self) - .await?; - - Ok(()) - } - - async fn get_eligible_unprocessed_commitments( - self, - status: UnprocessedStatus, - ) -> Result, Error> { - let query = sqlx::query( - r#" - SELECT * FROM unprocessed_identities - WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility - LIMIT $2 - "#, - ) - .bind(<&str>::from(status)) - .bind(MAX_UNPROCESSED_FETCH_COUNT); - - let result = self.fetch_all(query).await?; - - Ok(result - .into_iter() - .map(|row| types::UnprocessedCommitment { - commitment: row.get::(0), - status, - created_at: row.get::<_, _>(2), - processed_at: row.get::<_, _>(3), - error_message: row.get::<_, _>(4), - eligibility_timestamp: row.get::<_, _>(5), - }) - .collect::>()) - } - - /// Returns the error message from the unprocessed identities table - /// if it exists - /// - /// - The outer option represents the existence of the commitment in the - /// unprocessed_identities table - /// - The inner option represents the existence of an error message - async fn get_unprocessed_error( - self, - commitment: &Hash, - ) -> Result>, Error> { - let query = sqlx::query( - r#" - SELECT error_message FROM unprocessed_identities WHERE commitment = $1 - "#, - ) - .bind(commitment); - - Ok(self - .fetch_optional(query) - .await? - .map(|row| row.get::, _>(0))) - } - - async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { - let query = sqlx::query( - r#" - DELETE FROM unprocessed_identities WHERE commitment = $1 - "#, - ) - .bind(commitment); - - self.execute(query).await?; - - Ok(()) - } - - async fn identity_exists(self, commitment: Hash) -> Result { - Ok(sqlx::query( - r#" - select - EXISTS (select commitment from unprocessed_identities where commitment = $1) OR - EXISTS (select commitment from identities where commitment = $1); - "#, - ) - .bind(commitment) - .fetch_one(self) - .await? - .get::(0)) - } - - // TODO: add docs - async fn identity_is_queued_for_deletion(self, commitment: &Hash) -> Result { - let query_queued_deletion = - sqlx::query(r#"SELECT exists(SELECT 1 FROM deletions where commitment = $1)"#) - .bind(commitment); - let row_unprocessed = self.fetch_one(query_queued_deletion).await?; - Ok(row_unprocessed.get::(0)) - } - - async fn insert_new_batch_head(self, next_root: &Hash) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO batches( - id, - next_root, - prev_root, - created_at, - batch_type, - data - ) VALUES (DEFAULT, $1, NULL, CURRENT_TIMESTAMP, $2, $3) - "#, - ) - .bind(next_root) - .bind(BatchType::Insertion) - .bind(sqlx::types::Json::from(BatchEntryData { - identities: vec![], - indexes: vec![], - })); - - self.execute(query).await?; - Ok(()) - } - - async fn insert_new_batch( - self, - next_root: &Hash, - prev_root: &Hash, - batch_type: BatchType, - identities: &[Identity], - indexes: &[usize], - ) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO batches( - id, - next_root, - prev_root, - created_at, - batch_type, - data - ) VALUES (DEFAULT, $1, $2, CURRENT_TIMESTAMP, $3, $4) - "#, - ) - .bind(next_root) - .bind(prev_root) - .bind(batch_type) - .bind(sqlx::types::Json::from(BatchEntryData { - identities: identities.to_vec(), - indexes: indexes.to_vec(), - })); - - self.execute(query).await?; - Ok(()) - } - - #[cfg(test)] - async fn get_next_batch(self, prev_root: &Hash) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - id, - next_root, - prev_root, - created_at, - batch_type, - data - FROM batches WHERE prev_root = $1 - LIMIT 1 - "#, - ) - .bind(prev_root) - .fetch_optional(self) - .await?; - - Ok(res) - } - - async fn get_latest_batch(self) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - id, - next_root, - prev_root, - created_at, - batch_type, - data - FROM batches - ORDER BY id DESC - LIMIT 1 - "#, - ) - .fetch_optional(self) - .await?; - - Ok(res) - } - - async fn get_next_batch_without_transaction(self) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - batches.id, - batches.next_root, - batches.prev_root, - batches.created_at, - batches.batch_type, - batches.data - FROM batches - LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root - WHERE transactions.batch_next_root IS NULL AND batches.prev_root IS NOT NULL - ORDER BY batches.id ASC - LIMIT 1 - "#, - ) - .fetch_optional(self) - .await?; - - Ok(res) - } - - async fn get_batch_head(self) -> Result, Error> { - let res = sqlx::query_as::<_, BatchEntry>( - r#" - SELECT - id, - next_root, - prev_root, - created_at, - batch_type, - data - FROM batches WHERE prev_root IS NULL - LIMIT 1 - "#, - ) - .fetch_optional(self) - .await?; - - Ok(res) - } - - #[instrument(skip(self), level = "debug")] - async fn delete_batches_after_root(self, root: &Hash) -> Result<(), Error> { - let query = sqlx::query( - r#" - DELETE FROM batches - WHERE prev_root = $1 - "#, - ) - .bind(root); - - self.execute(query).await?; - Ok(()) - } - - #[instrument(skip(self), level = "debug")] - async fn delete_all_batches(self) -> Result<(), Error> { - let query = sqlx::query( - r#" - DELETE FROM batches - "#, - ); - - self.execute(query).await?; - Ok(()) - } - - async fn insert_new_transaction( - self, - transaction_id: &String, - batch_next_root: &Hash, - ) -> Result<(), Error> { - let query = sqlx::query( - r#" - INSERT INTO transactions( - transaction_id, - batch_next_root, - created_at - ) VALUES ($1, $2, CURRENT_TIMESTAMP) - "#, - ) - .bind(transaction_id) - .bind(batch_next_root); - - self.execute(query).await?; - Ok(()) - } -} diff --git a/src/database/transaction.rs b/src/database/transaction.rs deleted file mode 100644 index c2e98e11..00000000 --- a/src/database/transaction.rs +++ /dev/null @@ -1,110 +0,0 @@ -use sqlx::{Executor, Postgres, Transaction}; -use tracing::instrument; - -use crate::database::query::DatabaseQuery; -use crate::database::{Database, Error}; -use crate::identity_tree::{Hash, ProcessedStatus}; -use crate::retry_tx; - -async fn mark_root_as_processed( - tx: &mut Transaction<'_, Postgres>, - root: &Hash, -) -> Result<(), Error> { - let root_id = tx.get_id_by_root(root).await?; - - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; - - let root_id = root_id as i64; - // TODO: Can I get rid of line `AND status <> $2 - let update_previous_roots = sqlx::query( - r#" - UPDATE identities - SET status = $2, mined_at = CURRENT_TIMESTAMP - WHERE id <= $1 - AND status <> $2 - AND status <> $3; - "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Processed)) - .bind(<&str>::from(ProcessedStatus::Mined)); - - let update_next_roots = sqlx::query( - r#" - UPDATE identities - SET status = $2, mined_at = NULL - WHERE id > $1 - "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Pending)); - - tx.execute(update_previous_roots).await?; - tx.execute(update_next_roots).await?; - - Ok(()) -} - -pub async fn mark_root_as_mined( - tx: &mut Transaction<'_, Postgres>, - root: &Hash, -) -> Result<(), Error> { - let mined_status = ProcessedStatus::Mined; - - let root_id = tx.get_id_by_root(root).await?; - - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; - - let root_id = root_id as i64; - - let update_previous_roots = sqlx::query( - r#" - UPDATE identities - SET status = $2 - WHERE id <= $1 - AND status <> $2 - "#, - ) - .bind(root_id) - .bind(<&str>::from(mined_status)); - - tx.execute(update_previous_roots).await?; - - Ok(()) -} - -/// impl block for database transactions -impl Database { - /// Marks the identities and roots from before a given root hash as mined - /// Also marks following roots as pending - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_processed_tx(&self, root: &Hash) -> Result<(), Error> { - retry_tx!(self.pool, tx, mark_root_as_processed(&mut tx, root).await).await - } - - /// Marks the identities and roots from before a given root hash as mined - /// Also marks following roots as pending - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_processed_and_delete_batches_tx( - &self, - root: &Hash, - ) -> Result<(), Error> { - retry_tx!(self.pool, tx, { - mark_root_as_processed(&mut tx, root).await?; - tx.delete_batches_after_root(root).await?; - Result::<_, Error>::Ok(()) - }) - .await - } - - /// Marks the identities and roots from before a given root hash as - /// finalized - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> { - retry_tx!(self.pool, tx, mark_root_as_mined(&mut tx, root).await).await - } -} From 8734b52f4e0b3e2c9f410830da5c09261938fdd9 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Thu, 17 Oct 2024 12:37:52 +0200 Subject: [PATCH 6/7] cleanup --- src/app.rs | 1 - src/identity_tree/initializer.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/src/app.rs b/src/app.rs index 07276c8b..f1d24fcc 100644 --- a/src/app.rs +++ b/src/app.rs @@ -9,7 +9,6 @@ use tracing::{info, instrument, warn}; use crate::config::Config; use crate::contracts::IdentityManager; -// use crate::database::query::DatabaseQuery as _; use crate::database::methods::DbMethods as _; use crate::database::Database; use crate::ethereum::Ethereum; diff --git a/src/identity_tree/initializer.rs b/src/identity_tree/initializer.rs index 343fbb55..aeff37e0 100644 --- a/src/identity_tree/initializer.rs +++ b/src/identity_tree/initializer.rs @@ -5,7 +5,6 @@ use semaphore::poseidon_tree::LazyPoseidonTree; use tracing::{info, instrument, warn}; use crate::config::TreeConfig; -// use crate::database::query::DatabaseQuery; use crate::database::methods::DbMethods; use crate::database::Database; use crate::identity::processor::IdentityProcessor; From af1ec48e66f9ef8b719d2f3a3fb09c89a66ee615 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Thu, 17 Oct 2024 12:56:48 +0200 Subject: [PATCH 7/7] Add instrumentation --- src/database/methods.rs | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/database/methods.rs b/src/database/methods.rs index d8f9b5e4..e51c2073 100644 --- a/src/database/methods.rs +++ b/src/database/methods.rs @@ -20,6 +20,7 @@ const MAX_UNPROCESSED_FETCH_COUNT: i64 = 10_000; #[async_trait] pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { + #[instrument(skip(self), level = "debug")] async fn insert_pending_identity( self, leaf_index: usize, @@ -46,6 +47,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn get_id_by_root(self, root: &Hash) -> Result, Error> { let mut conn = self.acquire().await?; @@ -71,6 +73,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { /// Marks a root and associated entities as processed /// /// This is a composite operation performing multiple queries - it should be ran within a transaction. + #[instrument(skip(self), level = "debug")] async fn mark_root_as_processed(self, root: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -115,6 +118,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { /// Marks a root and associated identities as mined /// /// This is a composite operation performing multiple queries - it should be ran within a transaction. + #[instrument(skip(self), level = "debug")] async fn mark_root_as_mined(self, root: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -142,6 +146,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn mark_all_as_pending(self) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -159,6 +164,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn get_next_leaf_index(self) -> Result { let mut conn = self.acquire().await?; @@ -178,6 +184,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok((leaf_index + 1) as usize) } + #[instrument(skip(self), level = "debug")] async fn get_identity_leaf_index(self, identity: &Hash) -> Result, Error> { let mut conn = self.acquire().await?; @@ -208,6 +215,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(Some(TreeItem { status, leaf_index })) } + #[instrument(skip(self), level = "debug")] async fn get_commitments_by_status( self, status: ProcessedStatus, @@ -227,6 +235,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { .await?) } + #[instrument(skip(self), level = "debug")] async fn get_commitments_by_statuses( self, statuses: Vec, @@ -247,6 +256,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { .await?) } + #[instrument(skip(self, leaf_indexes), level = "debug")] async fn get_non_zero_commitments_by_leaf_indexes( self, leaf_indexes: I, @@ -275,6 +285,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { .collect()) } + #[instrument(skip(self), level = "debug")] async fn get_latest_root_by_status( self, status: ProcessedStatus, @@ -292,6 +303,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { .map(|r| r.get::(0))) } + #[instrument(skip(self), level = "debug")] async fn get_root_state(self, root: &Hash) -> Result, Error> { let mut conn = self.acquire().await?; @@ -315,6 +327,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { .await?) } + #[instrument(skip(self), level = "debug")] async fn get_latest_insertion(self) -> Result { let mut conn = self.acquire().await?; @@ -338,6 +351,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } } + #[instrument(skip(self), level = "debug")] async fn count_unprocessed_identities(self) -> Result { let mut conn = self.acquire().await?; @@ -353,6 +367,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(count as i32) } + #[instrument(skip(self), level = "debug")] async fn count_pending_identities(self) -> Result { let mut conn = self.acquire().await?; @@ -370,6 +385,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(count as i32) } + #[instrument(skip(self), level = "debug")] async fn get_provers(self) -> Result, Error> { let mut conn = self.acquire().await?; @@ -385,6 +401,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { .collect()) } + #[instrument(skip(self, url), level = "debug")] async fn insert_prover_configuration( self, batch_size: usize, @@ -412,6 +429,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn insert_provers(self, provers: HashSet) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -439,6 +457,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn remove_prover(self, batch_size: usize, prover_type: ProverType) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -455,6 +474,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn insert_new_identity( self, identity: Hash, @@ -477,6 +497,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(identity) } + #[instrument(skip(self), level = "debug")] async fn insert_new_recovery( self, existing_commitment: &Hash, @@ -498,6 +519,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn get_latest_deletion(self) -> Result { let mut conn = self.acquire().await?; @@ -517,6 +539,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } } + #[instrument(skip(self), level = "debug")] async fn update_latest_insertion( self, insertion_timestamp: DateTime, @@ -538,6 +561,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn update_latest_deletion(self, deletion_timestamp: DateTime) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -557,6 +581,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } #[cfg(test)] + #[instrument(skip(self), level = "debug")] async fn get_all_recoveries(self) -> Result, Error> { let mut conn = self.acquire().await?; @@ -567,6 +592,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { ) } + #[instrument(skip(self, prev_commits), level = "debug")] async fn delete_recoveries(self, prev_commits: I) -> Result, Error> where I: IntoIterator + Send, @@ -595,6 +621,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(res) } + #[instrument(skip(self), level = "debug")] async fn insert_new_deletion(self, leaf_index: usize, identity: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -614,6 +641,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { // TODO: consider using a larger value than i64 for leaf index, ruint should // have postgres compatibility for u256 + #[instrument(skip(self), level = "debug")] async fn get_deletions(self) -> Result, Error> { let mut conn = self.acquire().await?; @@ -636,6 +664,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } /// Remove a list of entries from the deletions table + #[instrument(skip(self), level = "debug")] async fn remove_deletions(self, commitments: &[Hash]) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -652,6 +681,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn get_eligible_unprocessed_commitments( self, status: UnprocessedStatus, @@ -689,6 +719,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { /// - The outer option represents the existence of the commitment in the /// unprocessed_identities table /// - The inner option represents the existence of an error message + #[instrument(skip(self), level = "debug")] async fn get_unprocessed_error( self, commitment: &Hash, @@ -709,6 +740,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(result.map(|(error_message,)| error_message)) } + #[instrument(skip(self), level = "debug")] async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -724,6 +756,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn identity_exists(self, commitment: Hash) -> Result { let mut conn = self.acquire().await?; @@ -741,6 +774,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } // TODO: add docs + #[instrument(skip(self), level = "debug")] async fn identity_is_queued_for_deletion(self, commitment: &Hash) -> Result { let mut conn = self.acquire().await?; @@ -753,6 +787,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(row_unprocessed.get::(0)) } + #[instrument(skip(self), level = "debug")] async fn insert_new_batch_head(self, next_root: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; @@ -780,6 +815,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn insert_new_batch( self, next_root: &Hash, @@ -816,6 +852,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } #[cfg(test)] + #[instrument(skip(self), level = "debug")] async fn get_next_batch(self, prev_root: &Hash) -> Result, Error> { let mut conn = self.acquire().await?; @@ -839,6 +876,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(res) } + #[instrument(skip(self), level = "debug")] async fn get_latest_batch(self) -> Result, Error> { let mut conn = self.acquire().await?; @@ -862,6 +900,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(res) } + #[instrument(skip(self), level = "debug")] async fn get_next_batch_without_transaction(self) -> Result, Error> { let mut conn = self.acquire().await?; @@ -887,6 +926,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(res) } + #[instrument(skip(self), level = "debug")] async fn get_batch_head(self) -> Result, Error> { let mut conn = self.acquire().await?; @@ -941,6 +981,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } + #[instrument(skip(self), level = "debug")] async fn insert_new_transaction( self, transaction_id: &String,