diff --git a/src/app.rs b/src/app.rs index e2cd9b87..f1d24fcc 100644 --- a/src/app.rs +++ b/src/app.rs @@ -9,7 +9,7 @@ use tracing::{info, instrument, warn}; use crate::config::Config; use crate::contracts::IdentityManager; -use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::database::Database; use crate::ethereum::Ethereum; use crate::identity::processor::{ diff --git a/src/database/query.rs b/src/database/methods.rs similarity index 62% rename from src/database/query.rs rename to src/database/methods.rs index 4ec9f502..e51c2073 100644 --- a/src/database/query.rs +++ b/src/database/methods.rs @@ -1,12 +1,14 @@ use std::collections::HashSet; +use axum::async_trait; use chrono::{DateTime, Utc}; use ruint::aliases::U256; -use sqlx::{Executor, Postgres, Row}; +use sqlx::{Acquire, Executor, Postgres, Row}; use tracing::instrument; -use types::{DeletionEntry, LatestDeletionEntry, RecoveryEntry}; +use types::{DeletionEntry, RecoveryEntry}; -use crate::database::types::{BatchEntry, BatchEntryData, BatchType, LatestInsertionEntry}; +use super::types::{LatestDeletionEntry, LatestInsertionEntry}; +use crate::database::types::{BatchEntry, BatchEntryData, BatchType}; use crate::database::{types, Error}; use crate::identity_tree::{ Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, @@ -16,10 +18,9 @@ use crate::prover::{ProverConfig, ProverType}; const MAX_UNPROCESSED_FETCH_COUNT: i64 = 10_000; -/// This trait provides the individual and composable queries to the database. -/// Each method is a single atomic query, and can be composed within a -/// transaction. -pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { +#[async_trait] +pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { + #[instrument(skip(self), level = "debug")] async fn insert_pending_identity( self, leaf_index: usize, @@ -27,7 +28,9 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { root: &Hash, pre_root: &Hash, ) -> Result<(), Error> { - let insert_pending_identity_query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" INSERT INTO identities (leaf_index, commitment, root, status, pending_as_of, pre_root) VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP, $5) @@ -37,15 +40,18 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { .bind(identity) .bind(root) .bind(<&str>::from(ProcessedStatus::Pending)) - .bind(pre_root); - - self.execute(insert_pending_identity_query).await?; + .bind(pre_root) + .execute(&mut *conn) + .await?; Ok(()) } + #[instrument(skip(self), level = "debug")] async fn get_id_by_root(self, root: &Hash) -> Result, Error> { - let root_index_query = sqlx::query( + let mut conn = self.acquire().await?; + + let row = sqlx::query( r#" SELECT id FROM identities @@ -54,9 +60,9 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { LIMIT 1 "#, ) - .bind(root); - - let row = self.fetch_optional(root_index_query).await?; + .bind(root) + .fetch_optional(&mut *conn) + .await?; let Some(row) = row else { return Ok(None) }; let root_id = row.get::(0); @@ -64,35 +70,113 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { Ok(Some(root_id as usize)) } - /// Marks all the identities in the db as + /// Marks a root and associated entities as processed + /// + /// This is a composite operation performing multiple queries - it should be ran within a transaction. #[instrument(skip(self), level = "debug")] - async fn mark_all_as_pending(self) -> Result<(), Error> { - let pending_status = ProcessedStatus::Pending; + async fn mark_root_as_processed(self, root: &Hash) -> Result<(), Error> { + let mut conn = self.acquire().await?; - let update_all_identities = sqlx::query( + let root_id = conn.get_id_by_root(root).await?; + + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; + + let root_id = root_id as i64; + + sqlx::query( r#" - UPDATE identities - SET status = $1, mined_at = NULL - WHERE status <> $1 - "#, + UPDATE identities + SET status = $2, mined_at = CURRENT_TIMESTAMP + WHERE id <= $1 + AND status <> $2 + AND status <> $3; + "#, ) - .bind(<&str>::from(pending_status)); + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Processed)) + .bind(<&str>::from(ProcessedStatus::Mined)) + .execute(&mut *conn) + .await?; + + sqlx::query( + r#" + UPDATE identities + SET status = $2, mined_at = NULL + WHERE id > $1 + "#, + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Pending)) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + /// Marks a root and associated identities as mined + /// + /// This is a composite operation performing multiple queries - it should be ran within a transaction. + #[instrument(skip(self), level = "debug")] + async fn mark_root_as_mined(self, root: &Hash) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + let root_id = conn.get_id_by_root(root).await?; + + let Some(root_id) = root_id else { + return Err(Error::MissingRoot { root: *root }); + }; + + let root_id = root_id as i64; - self.execute(update_all_identities).await?; + sqlx::query( + r#" + UPDATE identities + SET status = $2 + WHERE id <= $1 + AND status <> $2 + "#, + ) + .bind(root_id) + .bind(<&str>::from(ProcessedStatus::Mined)) + .execute(&mut *conn) + .await?; Ok(()) } + #[instrument(skip(self), level = "debug")] + async fn mark_all_as_pending(self) -> Result<(), Error> { + let mut conn = self.acquire().await?; + + sqlx::query( + r#" + UPDATE identities + SET status = $1, mined_at = NULL + WHERE status <> $1 + "#, + ) + .bind(<&str>::from(ProcessedStatus::Pending)) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + #[instrument(skip(self), level = "debug")] async fn get_next_leaf_index(self) -> Result { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + let row = sqlx::query( r#" SELECT leaf_index FROM identities ORDER BY leaf_index DESC LIMIT 1 "#, - ); - - let row = self.fetch_optional(query).await?; + ) + .fetch_optional(&mut *conn) + .await?; let Some(row) = row else { return Ok(0) }; let leaf_index = row.get::(0); @@ -100,8 +184,11 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { Ok((leaf_index + 1) as usize) } + #[instrument(skip(self), level = "debug")] async fn get_identity_leaf_index(self, identity: &Hash) -> Result, Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + let row = sqlx::query( r#" SELECT leaf_index, status FROM identities @@ -110,9 +197,11 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { LIMIT 1; "#, ) - .bind(identity); + .bind(identity) + .fetch_optional(&mut *conn) + .await?; - let Some(row) = self.fetch_optional(query).await? else { + let Some(row) = row else { return Ok(None); }; @@ -126,10 +215,13 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { Ok(Some(TreeItem { status, leaf_index })) } + #[instrument(skip(self), level = "debug")] async fn get_commitments_by_status( self, status: ProcessedStatus, ) -> Result, Error> { + let mut conn = self.acquire().await?; + Ok(sqlx::query_as::<_, TreeUpdate>( r#" SELECT leaf_index, commitment as element @@ -139,14 +231,17 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { "#, ) .bind(<&str>::from(status)) - .fetch_all(self) + .fetch_all(&mut *conn) .await?) } + #[instrument(skip(self), level = "debug")] async fn get_commitments_by_statuses( self, statuses: Vec, ) -> Result, Error> { + let mut conn = self.acquire().await?; + let statuses: Vec<&str> = statuses.into_iter().map(<&str>::from).collect(); Ok(sqlx::query_as::<_, TreeUpdate>( r#" @@ -157,14 +252,20 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { "#, ) .bind(&statuses[..]) // Official workaround https://github.com/launchbadge/sqlx/blob/main/FAQ.md#how-can-i-do-a-select--where-foo-in--query - .fetch_all(self) + .fetch_all(&mut *conn) .await?) } - async fn get_non_zero_commitments_by_leaf_indexes>( + #[instrument(skip(self, leaf_indexes), level = "debug")] + async fn get_non_zero_commitments_by_leaf_indexes( self, leaf_indexes: I, - ) -> Result, Error> { + ) -> Result, Error> + where + I: IntoIterator + Send, + { + let mut conn = self.acquire().await?; + let leaf_indexes: Vec = leaf_indexes.into_iter().map(|v| v as i64).collect(); Ok(sqlx::query( @@ -177,29 +278,35 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { ) .bind(&leaf_indexes[..]) // Official workaround https://github.com/launchbadge/sqlx/blob/main/FAQ.md#how-can-i-do-a-select--where-foo-in--query .bind(Hash::ZERO) - .fetch_all(self) + .fetch_all(&mut *conn) .await? .into_iter() .map(|row| row.get::(0)) .collect()) } + #[instrument(skip(self), level = "debug")] async fn get_latest_root_by_status( self, status: ProcessedStatus, ) -> Result, Error> { + let mut conn = self.acquire().await?; + Ok(sqlx::query( r#" SELECT root FROM identities WHERE status = $1 ORDER BY id DESC LIMIT 1 "#, ) .bind(<&str>::from(status)) - .fetch_optional(self) + .fetch_optional(&mut *conn) .await? .map(|r| r.get::(0))) } + #[instrument(skip(self), level = "debug")] async fn get_root_state(self, root: &Hash) -> Result, Error> { + let mut conn = self.acquire().await?; + // This tries really hard to do everything in one query to prevent race // conditions. Ok(sqlx::query_as::<_, RootItem>( @@ -216,19 +323,22 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { "#, ) .bind(root) - .fetch_optional(self) + .fetch_optional(&mut *conn) .await?) } + #[instrument(skip(self), level = "debug")] async fn get_latest_insertion(self) -> Result { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + let row = sqlx::query( r#" SELECT insertion_timestamp FROM latest_insertion_timestamp WHERE Lock = 'X';"#, - ); - - let row = self.fetch_optional(query).await?; + ) + .fetch_optional(&mut *conn) + .await?; if let Some(row) = row { Ok(LatestInsertionEntry { @@ -241,75 +351,95 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { } } + #[instrument(skip(self), level = "debug")] async fn count_unprocessed_identities(self) -> Result { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + let (count,): (i64,) = sqlx::query_as( r#" SELECT COUNT(*) as unprocessed FROM unprocessed_identities "#, - ); - let result = self.fetch_one(query).await?; - Ok(result.get::(0) as i32) + ) + .fetch_one(&mut *conn) + .await?; + + Ok(count as i32) } + #[instrument(skip(self), level = "debug")] async fn count_pending_identities(self) -> Result { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + let (count,): (i64,) = sqlx::query_as( r#" SELECT COUNT(*) as pending FROM identities WHERE status = $1 "#, ) - .bind(<&str>::from(ProcessedStatus::Pending)); - let result = self.fetch_one(query).await?; - Ok(result.get::(0) as i32) + .bind(<&str>::from(ProcessedStatus::Pending)) + .fetch_one(&mut *conn) + .await?; + + Ok(count as i32) } + #[instrument(skip(self), level = "debug")] async fn get_provers(self) -> Result, Error> { + let mut conn = self.acquire().await?; + Ok(sqlx::query_as( r#" SELECT batch_size, url, timeout_s, prover_type FROM provers "#, ) - .fetch_all(self) + .fetch_all(&mut *conn) .await? .into_iter() .collect()) } + + #[instrument(skip(self, url), level = "debug")] async fn insert_prover_configuration( self, batch_size: usize, - url: impl ToString, + url: impl ToString + Send, timeout_seconds: u64, prover_type: ProverType, ) -> Result<(), Error> { + let mut conn = self.acquire().await?; + let url = url.to_string(); - let query = sqlx::query( + sqlx::query( r#" - INSERT INTO provers (batch_size, url, timeout_s, prover_type) - VALUES ($1, $2, $3, $4) + INSERT INTO provers (batch_size, url, timeout_s, prover_type) + VALUES ($1, $2, $3, $4) "#, ) .bind(batch_size as i64) .bind(url) .bind(timeout_seconds as i64) - .bind(prover_type); - - self.execute(query).await?; + .bind(prover_type) + .execute(&mut *conn) + .await?; Ok(()) } + #[instrument(skip(self), level = "debug")] async fn insert_provers(self, provers: HashSet) -> Result<(), Error> { + let mut conn = self.acquire().await?; + if provers.is_empty() { return Ok(()); } let mut query_builder = sqlx::QueryBuilder::new( r#" - INSERT INTO provers (batch_size, url, timeout_s, prover_type) + INSERT INTO provers (batch_size, url, timeout_s, prover_type) "#, ); @@ -322,30 +452,37 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { let query = query_builder.build(); - self.execute(query).await?; + conn.execute(query).await?; + Ok(()) } + #[instrument(skip(self), level = "debug")] async fn remove_prover(self, batch_size: usize, prover_type: ProverType) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" - DELETE FROM provers WHERE batch_size = $1 AND prover_type = $2 + DELETE FROM provers WHERE batch_size = $1 AND prover_type = $2 "#, ) .bind(batch_size as i64) - .bind(prover_type); - - self.execute(query).await?; + .bind(prover_type) + .execute(&mut *conn) + .await?; Ok(()) } + #[instrument(skip(self), level = "debug")] async fn insert_new_identity( self, identity: Hash, eligibility_timestamp: sqlx::types::chrono::DateTime, ) -> Result { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" INSERT INTO unprocessed_identities (commitment, status, created_at, eligibility) VALUES ($1, $2, CURRENT_TIMESTAMP, $3) @@ -353,34 +490,43 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { ) .bind(identity) .bind(<&str>::from(UnprocessedStatus::New)) - .bind(eligibility_timestamp); + .bind(eligibility_timestamp) + .execute(&mut *conn) + .await?; - self.execute(query).await?; Ok(identity) } + #[instrument(skip(self), level = "debug")] async fn insert_new_recovery( self, existing_commitment: &Hash, new_commitment: &Hash, ) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" INSERT INTO recoveries (existing_commitment, new_commitment) VALUES ($1, $2) "#, ) .bind(existing_commitment) - .bind(new_commitment); - self.execute(query).await?; + .bind(new_commitment) + .execute(&mut *conn) + .await?; + Ok(()) } + #[instrument(skip(self), level = "debug")] async fn get_latest_deletion(self) -> Result { - let query = - sqlx::query("SELECT deletion_timestamp FROM latest_deletion_root WHERE Lock = 'X';"); + let mut conn = self.acquire().await?; - let row = self.fetch_optional(query).await?; + let row = + sqlx::query("SELECT deletion_timestamp FROM latest_deletion_root WHERE Lock = 'X';") + .fetch_optional(&mut *conn) + .await?; if let Some(row) = row { Ok(LatestDeletionEntry { @@ -393,11 +539,14 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { } } + #[instrument(skip(self), level = "debug")] async fn update_latest_insertion( self, insertion_timestamp: DateTime, ) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" INSERT INTO latest_insertion_timestamp (Lock, insertion_timestamp) VALUES ('X', $1) @@ -405,14 +554,18 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { DO UPDATE SET insertion_timestamp = EXCLUDED.insertion_timestamp; "#, ) - .bind(insertion_timestamp); + .bind(insertion_timestamp) + .execute(&mut *conn) + .await?; - self.execute(query).await?; Ok(()) } + #[instrument(skip(self), level = "debug")] async fn update_latest_deletion(self, deletion_timestamp: DateTime) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" INSERT INTO latest_deletion_root (Lock, deletion_timestamp) VALUES ('X', $1) @@ -420,25 +573,33 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { DO UPDATE SET deletion_timestamp = EXCLUDED.deletion_timestamp; "#, ) - .bind(deletion_timestamp); + .bind(deletion_timestamp) + .execute(&mut *conn) + .await?; - self.execute(query).await?; Ok(()) } #[cfg(test)] + #[instrument(skip(self), level = "debug")] async fn get_all_recoveries(self) -> Result, Error> { + let mut conn = self.acquire().await?; + Ok( sqlx::query_as::<_, RecoveryEntry>("SELECT * FROM recoveries") - .fetch_all(self) + .fetch_all(&mut *conn) .await?, ) } - async fn delete_recoveries, T: Into>( - self, - prev_commits: I, - ) -> Result, Error> { + #[instrument(skip(self, prev_commits), level = "debug")] + async fn delete_recoveries(self, prev_commits: I) -> Result, Error> + where + I: IntoIterator + Send, + T: Into, + { + let mut conn = self.acquire().await?; + // TODO: upstream PgHasArrayType impl to ruint let prev_commits = prev_commits .into_iter() @@ -454,37 +615,44 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { "#, ) .bind(&prev_commits) - .fetch_all(self) + .fetch_all(&mut *conn) .await?; Ok(res) } + #[instrument(skip(self), level = "debug")] async fn insert_new_deletion(self, leaf_index: usize, identity: &Hash) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" INSERT INTO deletions (leaf_index, commitment) VALUES ($1, $2) "#, ) .bind(leaf_index as i64) - .bind(identity); + .bind(identity) + .execute(&mut *conn) + .await?; - self.execute(query).await?; Ok(()) } // TODO: consider using a larger value than i64 for leaf index, ruint should // have postgres compatibility for u256 + #[instrument(skip(self), level = "debug")] async fn get_deletions(self) -> Result, Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + let result = sqlx::query( r#" SELECT * FROM deletions "#, - ); - - let result = self.fetch_all(query).await?; + ) + .fetch_all(&mut *conn) + .await?; Ok(result .into_iter() @@ -496,7 +664,10 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { } /// Remove a list of entries from the deletions table + #[instrument(skip(self), level = "debug")] async fn remove_deletions(self, commitments: &[Hash]) -> Result<(), Error> { + let mut conn = self.acquire().await?; + let commitments = commitments .iter() .map(|c| c.to_be_bytes()) @@ -504,27 +675,30 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { sqlx::query("DELETE FROM deletions WHERE commitment = Any($1)") .bind(commitments) - .execute(self) + .execute(&mut *conn) .await?; Ok(()) } + #[instrument(skip(self), level = "debug")] async fn get_eligible_unprocessed_commitments( self, status: UnprocessedStatus, ) -> Result, Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + let result = sqlx::query( r#" - SELECT * FROM unprocessed_identities - WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility - LIMIT $2 + SELECT * FROM unprocessed_identities + WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility + LIMIT $2 "#, ) .bind(<&str>::from(status)) - .bind(MAX_UNPROCESSED_FETCH_COUNT); - - let result = self.fetch_all(query).await?; + .bind(MAX_UNPROCESSED_FETCH_COUNT) + .fetch_all(&mut *conn) + .await?; Ok(result .into_iter() @@ -545,37 +719,47 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { /// - The outer option represents the existence of the commitment in the /// unprocessed_identities table /// - The inner option represents the existence of an error message + #[instrument(skip(self), level = "debug")] async fn get_unprocessed_error( self, commitment: &Hash, ) -> Result>, Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + let result: Option<(Option,)> = sqlx::query_as( r#" - SELECT error_message FROM unprocessed_identities WHERE commitment = $1 + SELECT error_message + FROM unprocessed_identities + WHERE commitment = $1 "#, ) - .bind(commitment); + .bind(commitment) + .fetch_optional(&mut *conn) + .await?; - Ok(self - .fetch_optional(query) - .await? - .map(|row| row.get::, _>(0))) + Ok(result.map(|(error_message,)| error_message)) } + #[instrument(skip(self), level = "debug")] async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" - DELETE FROM unprocessed_identities WHERE commitment = $1 + DELETE FROM unprocessed_identities WHERE commitment = $1 "#, ) - .bind(commitment); - - self.execute(query).await?; + .bind(commitment) + .execute(&mut *conn) + .await?; Ok(()) } + #[instrument(skip(self), level = "debug")] async fn identity_exists(self, commitment: Hash) -> Result { + let mut conn = self.acquire().await?; + Ok(sqlx::query( r#" select @@ -584,22 +768,30 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { "#, ) .bind(commitment) - .fetch_one(self) + .fetch_one(&mut *conn) .await? .get::(0)) } // TODO: add docs + #[instrument(skip(self), level = "debug")] async fn identity_is_queued_for_deletion(self, commitment: &Hash) -> Result { - let query_queued_deletion = + let mut conn = self.acquire().await?; + + let row_unprocessed = sqlx::query(r#"SELECT exists(SELECT 1 FROM deletions where commitment = $1)"#) - .bind(commitment); - let row_unprocessed = self.fetch_one(query_queued_deletion).await?; + .bind(commitment) + .fetch_one(&mut *conn) + .await?; + Ok(row_unprocessed.get::(0)) } + #[instrument(skip(self), level = "debug")] async fn insert_new_batch_head(self, next_root: &Hash) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" INSERT INTO batches( id, @@ -616,12 +808,14 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { .bind(sqlx::types::Json::from(BatchEntryData { identities: vec![], indexes: vec![], - })); + })) + .execute(&mut *conn) + .await?; - self.execute(query).await?; Ok(()) } + #[instrument(skip(self), level = "debug")] async fn insert_new_batch( self, next_root: &Hash, @@ -630,7 +824,9 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { identities: &[Identity], indexes: &[usize], ) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" INSERT INTO batches( id, @@ -648,14 +844,18 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { .bind(sqlx::types::Json::from(BatchEntryData { identities: identities.to_vec(), indexes: indexes.to_vec(), - })); + })) + .execute(&mut *conn) + .await?; - self.execute(query).await?; Ok(()) } #[cfg(test)] + #[instrument(skip(self), level = "debug")] async fn get_next_batch(self, prev_root: &Hash) -> Result, Error> { + let mut conn = self.acquire().await?; + let res = sqlx::query_as::<_, BatchEntry>( r#" SELECT @@ -670,13 +870,16 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { "#, ) .bind(prev_root) - .fetch_optional(self) + .fetch_optional(&mut *conn) .await?; Ok(res) } + #[instrument(skip(self), level = "debug")] async fn get_latest_batch(self) -> Result, Error> { + let mut conn = self.acquire().await?; + let res = sqlx::query_as::<_, BatchEntry>( r#" SELECT @@ -691,13 +894,16 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { LIMIT 1 "#, ) - .fetch_optional(self) + .fetch_optional(&mut *conn) .await?; Ok(res) } + #[instrument(skip(self), level = "debug")] async fn get_next_batch_without_transaction(self) -> Result, Error> { + let mut conn = self.acquire().await?; + let res = sqlx::query_as::<_, BatchEntry>( r#" SELECT @@ -714,13 +920,16 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { LIMIT 1 "#, ) - .fetch_optional(self) + .fetch_optional(&mut *conn) .await?; Ok(res) } + #[instrument(skip(self), level = "debug")] async fn get_batch_head(self) -> Result, Error> { + let mut conn = self.acquire().await?; + let res = sqlx::query_as::<_, BatchEntry>( r#" SELECT @@ -734,7 +943,7 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { LIMIT 1 "#, ) - .fetch_optional(self) + .fetch_optional(&mut *conn) .await?; Ok(res) @@ -742,36 +951,45 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { #[instrument(skip(self), level = "debug")] async fn delete_batches_after_root(self, root: &Hash) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" DELETE FROM batches WHERE prev_root = $1 "#, ) - .bind(root); + .bind(root) + .execute(&mut *conn) + .await?; - self.execute(query).await?; Ok(()) } #[instrument(skip(self), level = "debug")] async fn delete_all_batches(self) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" DELETE FROM batches "#, - ); + ) + .execute(&mut *conn) + .await?; - self.execute(query).await?; Ok(()) } + #[instrument(skip(self), level = "debug")] async fn insert_new_transaction( self, transaction_id: &String, batch_next_root: &Hash, ) -> Result<(), Error> { - let query = sqlx::query( + let mut conn = self.acquire().await?; + + sqlx::query( r#" INSERT INTO transactions( transaction_id, @@ -781,9 +999,13 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { "#, ) .bind(transaction_id) - .bind(batch_next_root); + .bind(batch_next_root) + .execute(&mut *conn) + .await?; - self.execute(query).await?; Ok(()) } } + +// Blanket implementation for all types that satisfy the trait bounds +impl<'c, T> DbMethods<'c> for T where T: Acquire<'c, Database = Postgres> + Send + Sync + Sized {} diff --git a/src/database/mod.rs b/src/database/mod.rs index cce028c0..f91e896d 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -15,12 +15,10 @@ use thiserror::Error; use tracing::{error, info, instrument, warn}; use crate::config::DatabaseConfig; -use crate::database::query::DatabaseQuery; use crate::identity_tree::Hash; -pub mod query; -pub mod transaction; pub mod types; +pub mod methods; // Statically link in migration files static MIGRATOR: Migrator = sqlx::migrate!("schemas/database"); @@ -37,8 +35,6 @@ impl Deref for Database { } } -impl<'a, T> DatabaseQuery<'a> for T where T: Executor<'a, Database = Postgres> {} - impl Database { #[instrument(skip_all)] pub async fn new(config: &DatabaseConfig) -> Result { @@ -164,7 +160,7 @@ mod test { use super::Database; use crate::config::DatabaseConfig; - use crate::database::query::DatabaseQuery; + use crate::database::methods::DbMethods; use crate::database::types::BatchType; use crate::identity_tree::{Hash, ProcessedStatus, UnprocessedStatus}; use crate::prover::identity::Identity; @@ -658,7 +654,9 @@ mod test { pre_root = &roots[i]; } - db.mark_root_as_processed_tx(&roots[2]).await?; + let mut tx = db.begin().await?; + tx.mark_root_as_processed(&roots[2]).await?; + tx.commit().await?; db.mark_all_as_pending().await?; @@ -691,7 +689,9 @@ mod test { pre_root = &roots[i]; } - db.mark_root_as_processed_tx(&roots[2]).await?; + let mut tx = db.begin().await?; + tx.mark_root_as_processed(&roots[2]).await?; + tx.commit().await?; for root in roots.iter().take(3) { let root = db @@ -737,7 +737,7 @@ mod test { pre_root = &roots[i]; } - db.mark_root_as_mined_tx(&roots[2]).await?; + db.mark_root_as_mined(&roots[2]).await?; for root in roots.iter().take(3) { let root = db @@ -786,27 +786,27 @@ mod test { } println!("Marking roots up to 2nd as processed"); - db.mark_root_as_processed_tx(&roots[2]).await?; + db.mark_root_as_processed(&roots[2]).await?; assert_roots_are(&db, &roots[..3], ProcessedStatus::Processed).await?; assert_roots_are(&db, &roots[3..], ProcessedStatus::Pending).await?; println!("Marking roots up to 1st as mined"); - db.mark_root_as_mined_tx(&roots[1]).await?; + db.mark_root_as_mined(&roots[1]).await?; assert_roots_are(&db, &roots[..2], ProcessedStatus::Mined).await?; assert_roots_are(&db, &[roots[2]], ProcessedStatus::Processed).await?; assert_roots_are(&db, &roots[3..], ProcessedStatus::Pending).await?; println!("Marking roots up to 4th as processed"); - db.mark_root_as_processed_tx(&roots[4]).await?; + db.mark_root_as_processed(&roots[4]).await?; assert_roots_are(&db, &roots[..2], ProcessedStatus::Mined).await?; assert_roots_are(&db, &roots[2..5], ProcessedStatus::Processed).await?; assert_roots_are(&db, &roots[5..], ProcessedStatus::Pending).await?; println!("Marking all roots as mined"); - db.mark_root_as_mined_tx(&roots[num_identities - 1]).await?; + db.mark_root_as_mined(&roots[num_identities - 1]).await?; assert_roots_are(&db, &roots, ProcessedStatus::Mined).await?; @@ -831,10 +831,10 @@ mod test { } // root[2] is somehow erroneously marked as mined - db.mark_root_as_processed_tx(&roots[2]).await?; + db.mark_root_as_processed(&roots[2]).await?; // Later we correctly mark the previous root as mined - db.mark_root_as_processed_tx(&roots[1]).await?; + db.mark_root_as_processed(&roots[1]).await?; for root in roots.iter().take(2) { let root = db @@ -893,7 +893,7 @@ mod test { "Root has not yet been mined" ); - db.mark_root_as_processed_tx(&roots[0]).await?; + db.mark_root_as_processed(&roots[0]).await?; let root = db .get_root_state(&roots[0]) @@ -937,7 +937,7 @@ mod test { pre_root = &roots[i]; } - db.mark_root_as_processed_tx(&roots[2]).await?; + db.mark_root_as_processed(&roots[2]).await?; let mined_tree_updates = db .get_commitments_by_status(ProcessedStatus::Processed) @@ -1070,7 +1070,7 @@ mod test { db.insert_pending_identity(3, &identities[3], &roots[3], &roots[2]) .await?; - db.mark_root_as_processed_tx(&roots[0]) + db.mark_root_as_processed(&roots[0]) .await .context("Marking root as mined")?; diff --git a/src/database/transaction.rs b/src/database/transaction.rs deleted file mode 100644 index c2e98e11..00000000 --- a/src/database/transaction.rs +++ /dev/null @@ -1,110 +0,0 @@ -use sqlx::{Executor, Postgres, Transaction}; -use tracing::instrument; - -use crate::database::query::DatabaseQuery; -use crate::database::{Database, Error}; -use crate::identity_tree::{Hash, ProcessedStatus}; -use crate::retry_tx; - -async fn mark_root_as_processed( - tx: &mut Transaction<'_, Postgres>, - root: &Hash, -) -> Result<(), Error> { - let root_id = tx.get_id_by_root(root).await?; - - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; - - let root_id = root_id as i64; - // TODO: Can I get rid of line `AND status <> $2 - let update_previous_roots = sqlx::query( - r#" - UPDATE identities - SET status = $2, mined_at = CURRENT_TIMESTAMP - WHERE id <= $1 - AND status <> $2 - AND status <> $3; - "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Processed)) - .bind(<&str>::from(ProcessedStatus::Mined)); - - let update_next_roots = sqlx::query( - r#" - UPDATE identities - SET status = $2, mined_at = NULL - WHERE id > $1 - "#, - ) - .bind(root_id) - .bind(<&str>::from(ProcessedStatus::Pending)); - - tx.execute(update_previous_roots).await?; - tx.execute(update_next_roots).await?; - - Ok(()) -} - -pub async fn mark_root_as_mined( - tx: &mut Transaction<'_, Postgres>, - root: &Hash, -) -> Result<(), Error> { - let mined_status = ProcessedStatus::Mined; - - let root_id = tx.get_id_by_root(root).await?; - - let Some(root_id) = root_id else { - return Err(Error::MissingRoot { root: *root }); - }; - - let root_id = root_id as i64; - - let update_previous_roots = sqlx::query( - r#" - UPDATE identities - SET status = $2 - WHERE id <= $1 - AND status <> $2 - "#, - ) - .bind(root_id) - .bind(<&str>::from(mined_status)); - - tx.execute(update_previous_roots).await?; - - Ok(()) -} - -/// impl block for database transactions -impl Database { - /// Marks the identities and roots from before a given root hash as mined - /// Also marks following roots as pending - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_processed_tx(&self, root: &Hash) -> Result<(), Error> { - retry_tx!(self.pool, tx, mark_root_as_processed(&mut tx, root).await).await - } - - /// Marks the identities and roots from before a given root hash as mined - /// Also marks following roots as pending - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_processed_and_delete_batches_tx( - &self, - root: &Hash, - ) -> Result<(), Error> { - retry_tx!(self.pool, tx, { - mark_root_as_processed(&mut tx, root).await?; - tx.delete_batches_after_root(root).await?; - Result::<_, Error>::Ok(()) - }) - .await - } - - /// Marks the identities and roots from before a given root hash as - /// finalized - #[instrument(skip(self), level = "debug")] - pub async fn mark_root_as_mined_tx(&self, root: &Hash) -> Result<(), Error> { - retry_tx!(self.pool, tx, mark_root_as_mined(&mut tx, root).await).await - } -} diff --git a/src/identity/processor.rs b/src/identity/processor.rs index 41c3e1e5..21618db8 100644 --- a/src/identity/processor.rs +++ b/src/identity/processor.rs @@ -16,7 +16,7 @@ use crate::config::Config; use crate::contracts::abi::{BridgedWorldId, RootAddedFilter, TreeChangeKind, TreeChangedFilter}; use crate::contracts::scanner::BlockScanner; use crate::contracts::IdentityManager; -use crate::database::query::DatabaseQuery; +use crate::database::methods::DbMethods; use crate::database::types::{BatchEntry, BatchType}; use crate::database::{Database, Error}; use crate::ethereum::{Ethereum, ReadProvider}; @@ -46,14 +46,14 @@ pub trait IdentityProcessor: Send + Sync + 'static { } pub struct OnChainIdentityProcessor { - ethereum: Ethereum, - config: Config, - database: Arc, - identity_manager: Arc, + ethereum: Ethereum, + config: Config, + database: Arc, + identity_manager: Arc, prover_repository: Arc, - mainnet_scanner: tokio::sync::Mutex>>, - mainnet_address: Address, + mainnet_scanner: tokio::sync::Mutex>>, + mainnet_address: Address, secondary_scanners: tokio::sync::Mutex>>>, } @@ -141,15 +141,20 @@ impl IdentityProcessor for OnChainIdentityProcessor { // Note that we don't have a way of queuing a root here for // finalization. so it's going to stay as "processed" // until the next root is mined. self.database. - self.database - .mark_root_as_processed_and_delete_batches_tx(&root_hash) - .await?; + retry_tx!(self.database.pool, tx, { + tx.mark_root_as_processed(&root_hash).await?; + tx.delete_batches_after_root(&root_hash).await?; + + Result::<(), Error>::Ok(()) + }) + .await?; } else { // Db is either empty or we're restarting with a new contract/chain // so we should mark everything as pending retry_tx!(self.database.pool, tx, { tx.mark_all_as_pending().await?; tx.delete_all_batches().await?; + Result::<(), Error>::Ok(()) }) .await?; @@ -420,9 +425,12 @@ impl OnChainIdentityProcessor { continue; } - self.database - .mark_root_as_processed_tx(&post_root.into()) - .await?; + retry_tx!( + self.database.pool, + tx, + tx.mark_root_as_processed(&post_root.into()).await + ) + .await?; info!(?pre_root, ?post_root, ?kind, "Batch mined"); @@ -456,7 +464,12 @@ impl OnChainIdentityProcessor { continue; } - self.database.mark_root_as_mined_tx(&root.into()).await?; + retry_tx!( + self.database.pool, + tx, + tx.mark_root_as_mined(&root.into()).await + ) + .await?; info!(?root, "Root finalized"); } @@ -549,7 +562,7 @@ impl OnChainIdentityProcessor { pub struct OffChainIdentityProcessor { committed_batches: Arc>>, - database: Arc, + database: Arc, } #[async_trait] @@ -575,14 +588,16 @@ impl IdentityProcessor for OffChainIdentityProcessor { self.update_eligible_recoveries(batch).await?; } - // With current flow it is required to mark root as processed first as this is - // how required mined_at field is set - self.database - .mark_root_as_processed_tx(&batch.next_root) - .await?; - self.database - .mark_root_as_mined_tx(&batch.next_root) - .await?; + retry_tx!(self.database.pool, tx, { + // With current flow it is required to mark root as processed first as this is + // how required mined_at field is set + tx.mark_root_as_processed(&batch.next_root).await?; + tx.mark_root_as_mined(&batch.next_root).await?; + + Result::<_, anyhow::Error>::Ok(()) + }) + .await?; + processed_tree.apply_updates_up_to(batch.next_root); } diff --git a/src/identity_tree/initializer.rs b/src/identity_tree/initializer.rs index 772dc268..aeff37e0 100644 --- a/src/identity_tree/initializer.rs +++ b/src/identity_tree/initializer.rs @@ -5,7 +5,7 @@ use semaphore::poseidon_tree::LazyPoseidonTree; use tracing::{info, instrument, warn}; use crate::config::TreeConfig; -use crate::database::query::DatabaseQuery; +use crate::database::methods::DbMethods; use crate::database::Database; use crate::identity::processor::IdentityProcessor; use crate::identity_tree::{ diff --git a/src/task_monitor/mod.rs b/src/task_monitor/mod.rs index 6318c21e..de0cc358 100644 --- a/src/task_monitor/mod.rs +++ b/src/task_monitor/mod.rs @@ -8,7 +8,7 @@ use tokio::task::JoinHandle; use tracing::{info, instrument, warn}; use crate::app::App; -use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::database::Database; use crate::shutdown::Shutdown; diff --git a/src/task_monitor/tasks/create_batches.rs b/src/task_monitor/tasks/create_batches.rs index 4f16e36f..4674f2b6 100644 --- a/src/task_monitor/tasks/create_batches.rs +++ b/src/task_monitor/tasks/create_batches.rs @@ -12,7 +12,7 @@ use tracing::instrument; use crate::app::App; use crate::database; -use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::database::Database; use crate::identity_tree::{ AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index 35aca551..7db83a75 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -9,7 +9,7 @@ use tokio::time; use tracing::info; use crate::app::App; -use crate::database::query::DatabaseQuery; +use crate::database::methods::DbMethods; use crate::database::types::DeletionEntry; use crate::identity_tree::{Hash, TreeVersionReadOps}; diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index bbd680a7..fafa275e 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -7,7 +7,7 @@ use tokio::time; use tracing::info; use crate::app::App; -use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::database::types::UnprocessedCommitment; use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; use crate::retry_tx; diff --git a/src/task_monitor/tasks/process_batches.rs b/src/task_monitor/tasks/process_batches.rs index 95fb6055..74efffe3 100644 --- a/src/task_monitor/tasks/process_batches.rs +++ b/src/task_monitor/tasks/process_batches.rs @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, Notify}; use tokio::{select, time}; use crate::app::App; -use crate::database::query::DatabaseQuery as _; +use crate::database::methods::DbMethods as _; use crate::identity::processor::TransactionId; pub async fn process_batches(