diff --git a/schemas/database/015_add_constraints_for_identities.down.sql b/schemas/database/015_add_constraints_for_identities.down.sql new file mode 100644 index 00000000..f6392d40 --- /dev/null +++ b/schemas/database/015_add_constraints_for_identities.down.sql @@ -0,0 +1,7 @@ +DROP UNIQUE INDEX idx_unique_insertion_leaf; +DROP UNIQUE INDEX idx_unique_deletion_leaf; + +DROP TRIGGER validate_pre_root_trigger; +DROP FUNCTION validate_pre_root(); + +ALTER TABLE identities DROP COLUMN pre_root; \ No newline at end of file diff --git a/schemas/database/015_add_constraints_for_identities.up.sql b/schemas/database/015_add_constraints_for_identities.up.sql new file mode 100644 index 00000000..028c4cca --- /dev/null +++ b/schemas/database/015_add_constraints_for_identities.up.sql @@ -0,0 +1,52 @@ +CREATE UNIQUE INDEX idx_unique_insertion_leaf on identities(leaf_index) WHERE commitment != E'\\x0000000000000000000000000000000000000000000000000000000000000000'; +CREATE UNIQUE INDEX idx_unique_deletion_leaf on identities(leaf_index) WHERE commitment = E'\\x0000000000000000000000000000000000000000000000000000000000000000'; + +-- Add the new 'prev_root' column +ALTER TABLE identities ADD COLUMN pre_root BYTEA; + +-- This constraint ensures that we have consistent database and changes to the tre are done in a valid sequence. +CREATE OR REPLACE FUNCTION validate_pre_root() returns trigger as $$ + DECLARE + last_id identities.id%type; + last_root identities.root%type; + BEGIN + SELECT id, root + INTO last_id, last_root + FROM identities + ORDER BY id DESC + LIMIT 1; + + -- When last_id is NULL that means there are no records in identities table. The first prev_root can + -- be a value not referencing previous root in database. + IF last_id IS NULL THEN RETURN NEW; + END IF; + + IF NEW.pre_root IS NULL THEN RAISE EXCEPTION 'Sent pre_root (%) can be null only for first record in table.', NEW.pre_root; + END IF; + + IF (last_root != NEW.pre_root) THEN RAISE EXCEPTION 'Sent pre_root (%) is different than last root (%) in database.', NEW.pre_root, last_root; + END IF; + + RETURN NEW; + END; +$$ language plpgsql; + +CREATE TRIGGER validate_pre_root_trigger BEFORE INSERT ON identities FOR EACH ROW EXECUTE PROCEDURE validate_pre_root(); + +-- Below function took around 10 minutes for 10 million records. +DO +$do$ +DECLARE + prev_root identities.pre_root%type := NULL; + identity identities%rowtype; +BEGIN + FOR identity IN SELECT * FROM identities ORDER BY id ASC + LOOP + IF identity.pre_root IS NULL THEN UPDATE identities SET pre_root = prev_root WHERE id = identity.id; + END IF; + prev_root = identity.root; + END LOOP; +END +$do$; + +CREATE UNIQUE INDEX idx_unique_pre_root on identities(pre_root) WHERE pre_root IS NOT NULL; diff --git a/src/database/mod.rs b/src/database/mod.rs index 61ee8cfa..a0cbf2bf 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -158,6 +158,7 @@ mod test { use ethers::types::U256; use postgres_docker_utils::DockerContainer; use ruint::Uint; + use semaphore::poseidon_tree::LazyPoseidonTree; use semaphore::Field; use testcontainers::clients::Cli; @@ -284,12 +285,15 @@ mod test { let (db, _db_container) = setup_db(&docker).await?; let zero: Hash = U256::zero().into(); + let initial_root = LazyPoseidonTree::new(4, zero).root(); let zero_root: Hash = U256::from_dec_str("6789")?.into(); let root: Hash = U256::from_dec_str("54321")?.into(); let commitment: Hash = U256::from_dec_str("12345")?.into(); - db.insert_pending_identity(0, &commitment, &root).await?; - db.insert_pending_identity(0, &zero, &zero_root).await?; + db.insert_pending_identity(0, &commitment, &root, &initial_root) + .await?; + db.insert_pending_identity(0, &zero, &zero_root, &root) + .await?; let leaf_index = db .get_identity_leaf_index(&commitment) @@ -557,23 +561,6 @@ mod test { Ok(()) } - #[tokio::test] - async fn test_update_insertion_timestamp() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - - let insertion_timestamp = Utc::now(); - - db.update_latest_insertion_timestamp(insertion_timestamp) - .await?; - - let latest_insertion_timestamp = db.get_latest_insertion_timestamp().await?.unwrap(); - - assert!(latest_insertion_timestamp.timestamp() - insertion_timestamp.timestamp() <= 1); - - Ok(()) - } - #[tokio::test] async fn test_insert_deletion() -> anyhow::Result<()> { let docker = Cli::default(); @@ -637,6 +624,7 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(1); let roots = mock_roots(1); @@ -644,7 +632,7 @@ mod test { assert_eq!(next_leaf_index, 0, "Db should contain not leaf indexes"); - db.insert_pending_identity(0, &identities[0], &roots[0]) + db.insert_pending_identity(0, &identities[0], &roots[0], &initial_root) .await?; let next_leaf_index = db.get_next_leaf_index().await?; @@ -658,13 +646,16 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(5); let roots = mock_roots(5); + let mut pre_root = &initial_root; for i in 0..5 { - db.insert_pending_identity(i, &identities[i], &roots[i]) + db.insert_pending_identity(i, &identities[i], &roots[i], pre_root) .await .context("Inserting identity")?; + pre_root = &roots[i]; } db.mark_root_as_processed_tx(&roots[2]).await?; @@ -688,13 +679,16 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(5); let roots = mock_roots(5); + let mut pre_root = &initial_root; for i in 0..5 { - db.insert_pending_identity(i, &identities[i], &roots[i]) + db.insert_pending_identity(i, &identities[i], &roots[i], pre_root) .await .context("Inserting identity")?; + pre_root = &roots[i]; } db.mark_root_as_processed_tx(&roots[2]).await?; @@ -731,13 +725,16 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(5); let roots = mock_roots(5); + let mut pre_root = &initial_root; for i in 0..5 { - db.insert_pending_identity(i, &identities[i], &roots[i]) + db.insert_pending_identity(i, &identities[i], &roots[i], pre_root) .await .context("Inserting identity")?; + pre_root = &roots[i]; } db.mark_root_as_mined_tx(&roots[2]).await?; @@ -776,13 +773,16 @@ mod test { let num_identities = 6; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(num_identities); let roots = mock_roots(num_identities); + let mut pre_root = &initial_root; for i in 0..num_identities { - db.insert_pending_identity(i, &identities[i], &roots[i]) + db.insert_pending_identity(i, &identities[i], &roots[i], pre_root) .await .context("Inserting identity")?; + pre_root = &roots[i]; } println!("Marking roots up to 2nd as processed"); @@ -818,13 +818,16 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(5); let roots = mock_roots(5); + let mut pre_root = &initial_root; for i in 0..5 { - db.insert_pending_identity(i, &identities[i], &roots[i]) + db.insert_pending_identity(i, &identities[i], &roots[i], pre_root) .await .context("Inserting identity")?; + pre_root = &roots[i]; } // root[2] is somehow erroneously marked as mined @@ -862,6 +865,7 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(5); let roots = mock_roots(5); @@ -869,7 +873,7 @@ mod test { assert!(root.is_none(), "Root should not exist"); - db.insert_pending_identity(0, &identities[0], &roots[0]) + db.insert_pending_identity(0, &identities[0], &roots[0], &initial_root) .await?; let root = db @@ -920,14 +924,17 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(5); let roots = mock_roots(7); + let mut pre_root = &initial_root; for i in 0..5 { - db.insert_pending_identity(i, &identities[i], &roots[i]) + db.insert_pending_identity(i, &identities[i], &roots[i], pre_root) .await .context("Inserting identity")?; + pre_root = &roots[i]; } db.mark_root_as_processed_tx(&roots[2]).await?; @@ -959,20 +966,23 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(5); let roots = mock_roots(5); let zero_roots = mock_zero_roots(5); + let mut pre_root = &initial_root; for i in 0..5 { - db.insert_pending_identity(i, &identities[i], &roots[i]) + db.insert_pending_identity(i, &identities[i], &roots[i], pre_root) .await .context("Inserting identity")?; + pre_root = &roots[i]; } - db.insert_pending_identity(0, &Hash::ZERO, &zero_roots[0]) + db.insert_pending_identity(0, &Hash::ZERO, &zero_roots[0], &roots[4]) .await?; - db.insert_pending_identity(3, &Hash::ZERO, &zero_roots[3]) + db.insert_pending_identity(3, &Hash::ZERO, &zero_roots[3], &zero_roots[0]) .await?; let pending_tree_updates = db @@ -1014,10 +1024,11 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(5); let roots = mock_roots(5); - db.insert_pending_identity(0, &identities[0], &roots[0]) + db.insert_pending_identity(0, &identities[0], &roots[0], &initial_root) .await .context("Inserting identity 1")?; @@ -1034,9 +1045,9 @@ mod test { // Inserting a new pending root sets invalidation time for the // previous root - db.insert_pending_identity(1, &identities[1], &roots[1]) + db.insert_pending_identity(1, &identities[1], &roots[1], &roots[0]) .await?; - db.insert_pending_identity(2, &identities[2], &roots[2]) + db.insert_pending_identity(2, &identities[2], &roots[2], &roots[1]) .await?; let root_1_inserted_at = Utc::now(); @@ -1056,7 +1067,7 @@ mod test { assert_same_time!(root_item_1.pending_valid_as_of, root_1_inserted_at); // Test mined roots - db.insert_pending_identity(3, &identities[3], &roots[3]) + db.insert_pending_identity(3, &identities[3], &roots[3], &roots[2]) .await?; db.mark_root_as_processed_tx(&roots[0]) @@ -1091,6 +1102,7 @@ mod test { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(2); let roots = mock_roots(1); @@ -1106,7 +1118,7 @@ mod test { assert!(db.identity_exists(identities[0]).await?); // When there's only processed identity - db.insert_pending_identity(0, &identities[1], &roots[0]) + db.insert_pending_identity(0, &identities[1], &roots[0], &initial_root) .await .context("Inserting identity")?; @@ -1148,7 +1160,35 @@ mod test { } #[tokio::test] - async fn test_latest_deletion_root() -> anyhow::Result<()> { + async fn test_latest_insertion() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + + // Update with initial timestamp + let initial_timestamp = chrono::Utc::now(); + db.update_latest_insertion(initial_timestamp) + .await + .context("Inserting initial root")?; + + // Assert values + let initial_entry = db.get_latest_insertion().await?; + assert!(initial_entry.timestamp.timestamp() - initial_timestamp.timestamp() <= 1); + + // Update with a new timestamp + let new_timestamp = chrono::Utc::now(); + db.update_latest_insertion(new_timestamp) + .await + .context("Updating with new root")?; + + // Assert values + let new_entry = db.get_latest_insertion().await?; + assert!((new_entry.timestamp.timestamp() - new_timestamp.timestamp()) <= 1); + + Ok(()) + } + + #[tokio::test] + async fn test_latest_deletion() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -1179,25 +1219,20 @@ mod test { async fn can_not_insert_same_root_multiple_times() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; + + let initial_root = LazyPoseidonTree::new(4, Hash::ZERO).root(); let identities = mock_identities(2); let roots = mock_roots(2); - db.insert_pending_identity(0, &identities[0], &roots[0]) + db.insert_pending_identity(0, &identities[0], &roots[0], &initial_root) .await?; let res = db - .insert_pending_identity(1, &identities[1], &roots[0]) + .insert_pending_identity(1, &identities[1], &roots[0], &roots[0]) .await; assert!(res.is_err(), "Inserting duplicate root should fail"); - let root_state = db - .get_root_state(&roots[0]) - .await? - .context("Missing root")?; - - assert_eq!(root_state.status, ProcessedStatus::Pending); - Ok(()) } diff --git a/src/database/query.rs b/src/database/query.rs index 26121c88..01d2f1f9 100644 --- a/src/database/query.rs +++ b/src/database/query.rs @@ -6,7 +6,9 @@ use sqlx::{Executor, Postgres, Row}; use tracing::instrument; use types::{DeletionEntry, LatestDeletionEntry, RecoveryEntry}; -use crate::database::types::{BatchEntry, BatchEntryData, BatchType, TransactionEntry}; +use crate::database::types::{ + BatchEntry, BatchEntryData, BatchType, LatestInsertionEntry, TransactionEntry, +}; use crate::database::{types, Error}; use crate::identity_tree::{ Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, @@ -25,17 +27,19 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { leaf_index: usize, identity: &Hash, root: &Hash, + pre_root: &Hash, ) -> Result<(), Error> { let insert_pending_identity_query = sqlx::query( r#" - INSERT INTO identities (leaf_index, commitment, root, status, pending_as_of) - VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP) + INSERT INTO identities (leaf_index, commitment, root, status, pending_as_of, pre_root) + VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP, $5) "#, ) .bind(leaf_index as i64) .bind(identity) .bind(root) - .bind(<&str>::from(ProcessedStatus::Pending)); + .bind(<&str>::from(ProcessedStatus::Pending)) + .bind(pre_root); self.execute(insert_pending_identity_query).await?; @@ -182,6 +186,17 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { .collect()) } + async fn get_latest_root(self) -> Result, Error> { + Ok(sqlx::query( + r#" + SELECT root FROM identities ORDER BY id DESC LIMIT 1 + "#, + ) + .fetch_optional(self) + .await? + .map(|r| r.get::(0))) + } + async fn get_latest_root_by_status( self, status: ProcessedStatus, @@ -218,7 +233,7 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { .await?) } - async fn get_latest_insertion_timestamp(self) -> Result>, Error> { + async fn get_latest_insertion(self) -> Result { let query = sqlx::query( r#" SELECT insertion_timestamp @@ -228,7 +243,15 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { let row = self.fetch_optional(query).await?; - Ok(row.map(|r| r.get::, _>(0))) + if let Some(row) = row { + Ok(LatestInsertionEntry { + timestamp: row.get(0), + }) + } else { + Ok(LatestInsertionEntry { + timestamp: Utc::now(), + }) + } } async fn count_unprocessed_identities(self) -> Result { @@ -383,7 +406,7 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { } } - async fn update_latest_insertion_timestamp( + async fn update_latest_insertion( self, insertion_timestamp: DateTime, ) -> Result<(), Error> { diff --git a/src/database/types.rs b/src/database/types.rs index 768f6be6..a0fbe585 100644 --- a/src/database/types.rs +++ b/src/database/types.rs @@ -24,6 +24,10 @@ pub struct RecoveryEntry { pub new_commitment: Hash, } +pub struct LatestInsertionEntry { + pub timestamp: DateTime, +} + pub struct LatestDeletionEntry { pub timestamp: DateTime, } diff --git a/src/task_monitor/tasks/create_batches.rs b/src/task_monitor/tasks/create_batches.rs index 1df6eebc..4f16e36f 100644 --- a/src/task_monitor/tasks/create_batches.rs +++ b/src/task_monitor/tasks/create_batches.rs @@ -50,11 +50,7 @@ pub async fn create_batches( // inserted. If we have an incomplete batch but are within a small delta of the // tick happening anyway in the wake branch, we insert the current // (possibly-incomplete) batch anyway. - let mut last_batch_time: DateTime = app - .database - .get_latest_insertion_timestamp() - .await? - .unwrap_or(Utc::now()); + let mut last_batch_time: DateTime = app.database.get_latest_insertion().await?.timestamp; loop { // We wait either for a timer tick or a full batch @@ -127,7 +123,7 @@ pub async fn create_batches( timer.reset(); last_batch_time = Utc::now(); app.database - .update_latest_insertion_timestamp(last_batch_time) + .update_latest_insertion(last_batch_time) .await?; } else { // Check if the next batch after the current insertion batch is diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index a499553c..35aca551 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -13,6 +13,12 @@ use crate::database::query::DatabaseQuery; use crate::database::types::DeletionEntry; use crate::identity_tree::{Hash, TreeVersionReadOps}; +// Deletion here differs from insert_identites task. This is because two +// different flows are created for both tasks. Due to how our prover works +// (can handle only a batch of same operations types - insertion or deletion) +// we want to group together insertions and deletions. We are doing it by +// grouping deletions (as the not need to be put into tree immediately as +// insertions) and putting them into the tree pub async fn delete_identities( app: Arc, pending_insertions_mutex: Arc>, @@ -70,6 +76,7 @@ pub async fn delete_identities( } } + let mut pre_root = app.tree_state()?.latest_tree().get_root(); // Delete the commitments at the target leaf indices in the latest tree, // generating the proof for each update let data = app.tree_state()?.latest_tree().delete_many(&leaf_indices); @@ -84,8 +91,9 @@ pub async fn delete_identities( let items = data.into_iter().zip(leaf_indices); for ((root, _proof), leaf_index) in items { app.database - .insert_pending_identity(leaf_index, &Hash::ZERO, &root) + .insert_pending_identity(leaf_index, &Hash::ZERO, &root, &pre_root) .await?; + pre_root = root; } // Remove the previous commitments from the deletions table diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index e7afd568..bbd680a7 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use sqlx::{Postgres, Transaction}; use tokio::sync::{Mutex, Notify}; use tokio::time; use tracing::info; @@ -8,10 +9,14 @@ use tracing::info; use crate::app::App; use crate::database::query::DatabaseQuery as _; use crate::database::types::UnprocessedCommitment; -use crate::database::Database; use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; use crate::retry_tx; +// Insertion here differs from delete_identities task. This is because two +// different flows are created for both tasks. We need to insert identities as +// fast as possible to the tree to be able to return inclusion proof as our +// customers depend on it. Flow here is to rewrite from unprocessed_identities +// into identities every 5 seconds. pub async fn insert_identities( app: Arc, pending_insertions_mutex: Arc>, @@ -34,12 +39,12 @@ pub async fn insert_identities( continue; } - insert_identities_batch( - &app.database, - app.tree_state()?.latest_tree(), - &unprocessed, - &pending_insertions_mutex, - ) + let _guard = pending_insertions_mutex.lock().await; + let latest_tree = app.tree_state()?.latest_tree(); + + retry_tx!(&app.database, tx, { + insert_identities_batch(&mut tx, latest_tree, &unprocessed).await + }) .await?; // Notify the identity processing task, that there are new identities @@ -48,35 +53,28 @@ pub async fn insert_identities( } pub async fn insert_identities_batch( - database: &Database, + tx: &mut Transaction<'_, Postgres>, latest_tree: &TreeVersion, identities: &[UnprocessedCommitment], - pending_insertions_mutex: &Mutex<()>, -) -> anyhow::Result<()> { - let filtered_identities = retry_tx!(database, tx, { - // Filter out any identities that are already in the `identities` table - let mut filtered_identities = vec![]; - for identity in identities { - if tx - .get_identity_leaf_index(&identity.commitment) - .await? - .is_some() - { - tracing::warn!(?identity.commitment, "Duplicate identity"); - tx.remove_unprocessed_identity(&identity.commitment).await?; - } else { - filtered_identities.push(identity.commitment); - } +) -> Result<(), anyhow::Error> { + // Filter out any identities that are already in the `identities` table + let mut filtered_identities = vec![]; + for identity in identities { + if tx + .get_identity_leaf_index(&identity.commitment) + .await? + .is_some() + { + tracing::warn!(?identity.commitment, "Duplicate identity"); + tx.remove_unprocessed_identity(&identity.commitment).await?; + } else { + filtered_identities.push(identity.commitment); } - Result::<_, anyhow::Error>::Ok(filtered_identities) - }) - .await?; - - let _guard = pending_insertions_mutex.lock().await; + } let next_leaf = latest_tree.next_leaf(); - let next_db_index = retry_tx!(database, tx, tx.get_next_leaf_index().await).await?; + let next_db_index = tx.get_next_leaf_index().await?; assert_eq!( next_leaf, next_db_index, @@ -84,6 +82,7 @@ pub async fn insert_identities_batch( {next_db_index}" ); + let mut pre_root = &latest_tree.get_root(); let data = latest_tree.append_many(&filtered_identities); assert_eq!( @@ -92,15 +91,13 @@ pub async fn insert_identities_batch( "Length mismatch when appending identities to tree" ); - retry_tx!(database, tx, { - for ((root, _proof, leaf_index), identity) in data.iter().zip(&filtered_identities) { - tx.insert_pending_identity(*leaf_index, identity, root) - .await?; + for ((root, _proof, leaf_index), identity) in data.iter().zip(&filtered_identities) { + tx.insert_pending_identity(*leaf_index, identity, root, pre_root) + .await?; + pre_root = root; - tx.remove_unprocessed_identity(identity).await?; - } + tx.remove_unprocessed_identity(identity).await?; + } - Result::<_, anyhow::Error>::Ok(()) - }) - .await + Ok(()) }