diff --git a/Readme.md b/Readme.md index 4e0ce338..ec0104c6 100644 --- a/Readme.md +++ b/Readme.md @@ -37,17 +37,14 @@ Sequencer has 6 API routes. indeed in the tree. The inclusion proof is then returned to the API caller. 3. `/deleteIdentity` - Takes an identity commitment hash, ensures that it exists and hasn't been deleted yet. This identity is then scheduled for deletion. -4. `/recoverIdentity` - Takes two identity commitment hashes. The first must exist and will be scheduled for deletion - and the other will be inserted as a replacement after the first identity has been deleted and a set amount of time ( - depends on configuration parameters) has passed. -5. `/verifySemaphoreProof` - This call takes root, signal hash, nullifier hash, external nullifier hash and a proof. +4. `/verifySemaphoreProof` - This call takes root, signal hash, nullifier hash, external nullifier hash and a proof. The proving key is fetched based on the depth index, and verification key as well. The list of prime fields is created based on request input mentioned before, and then we proceed to verify the proof. Sequencer uses groth16 zk-SNARK implementation. The API call returns the proof as a response. -6. `/addBatchSize` - Adds a prover with specific batch size to a list of provers. -7. `/removeBatchSize` - Removes the prover based on batch size. -8. `/listBatchSizes` - Lists all provers that are added to the Sequencer. +5. `/addBatchSize` - Adds a prover with specific batch size to a list of provers. +6. `/removeBatchSize` - Removes the prover based on batch size. +7. `/listBatchSizes` - Lists all provers that are added to the Sequencer. ## Getting Started diff --git a/schemas/database/016_remove_recovery.down.sql b/schemas/database/016_remove_recovery.down.sql new file mode 100644 index 00000000..68221433 --- /dev/null +++ b/schemas/database/016_remove_recovery.down.sql @@ -0,0 +1,13 @@ +CREATE TABLE recoveries ( + existing_commitment BYTEA NOT NULL UNIQUE, + new_commitment BYTEA NOT NULL UNIQUE +); + +ALTER TABLE unprocessed_identities + ADD COLUMN eligibility TIMESTAMPTZ, + ADD COLUMN status VARCHAR(50) NOT NULL, + ADD COLUMN processed_at TIMESTAMPTZ, + ADD COLUMN error_message TEXT; + +ALTER TABLE unprocessed_identities + DROP CONSTRAINT unique_commitment; diff --git a/schemas/database/016_remove_recovery.up.sql b/schemas/database/016_remove_recovery.up.sql new file mode 100644 index 00000000..4d0ea58a --- /dev/null +++ b/schemas/database/016_remove_recovery.up.sql @@ -0,0 +1,11 @@ +DROP TABLE recoveries; + +ALTER TABLE unprocessed_identities + DROP COLUMN eligibility, + DROP COLUMN status, + DROP COLUMN processed_at, + DROP COLUMN error_message; + +ALTER TABLE unprocessed_identities + ADD CONSTRAINT unique_commitment UNIQUE (commitment); + diff --git a/schemas/openapi.yaml b/schemas/openapi.yaml index 43a2f951..67b181c4 100644 --- a/schemas/openapi.yaml +++ b/schemas/openapi.yaml @@ -62,26 +62,6 @@ paths: schema: description: 'Identity could not be queued for deletion' type: 'string' - /recoverIdentity: - post: - summary: 'Queues a recovery request, deleting the previous identity specified and inserting the new one. - New insertions must wait a specified time delay before being included in the merkle tree' - requestBody: - required: true - content: - application/json: - schema: - $ref: '#/components/schemas/RecoveryRequest' - responses: - '202': - description: 'Identity has been successfully queued for recovery' - '400': - description: 'Invalid request' - content: - application/json: - schema: - description: 'Identity could not be queued for recovery' - type: 'string' /inclusionProof: post: summary: 'Get Merkle inclusion proof' @@ -152,15 +132,6 @@ paths: components: schemas: - RecoveryRequest: - type: object - properties: - previousIdentityCommitment: - type: string - pattern: '^[A-F0-9]{64}$' - newIdentityCommitment: - type: string - pattern: '^[A-F0-9]{64}$' IdentityCommitment: type: object properties: diff --git a/src/app.rs b/src/app.rs index b2cf3a03..02ca8de3 100644 --- a/src/app.rs +++ b/src/app.rs @@ -164,7 +164,11 @@ impl App { return Err(ServerError::DuplicateCommitment); } +<<<<<<< HEAD tx.insert_new_identity(commitment, Utc::now()).await?; +======= + tx.insert_unprocessed_identity(commitment, Utc::now()).await?; +>>>>>>> 428daf1 (WIP) tx.commit().await?; diff --git a/src/database/methods.rs b/src/database/methods.rs index 96e53410..d4f0d8c1 100644 --- a/src/database/methods.rs +++ b/src/database/methods.rs @@ -475,50 +475,23 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } #[instrument(skip(self), level = "debug")] - async fn insert_new_identity( - self, - identity: Hash, - eligibility_timestamp: sqlx::types::chrono::DateTime, - ) -> Result { + async fn insert_unprocessed_identity(self, identity: Hash) -> Result { let mut conn = self.acquire().await?; sqlx::query( r#" - INSERT INTO unprocessed_identities (commitment, status, created_at, eligibility) - VALUES ($1, $2, CURRENT_TIMESTAMP, $3) + INSERT INTO unprocessed_identities (commitment, created_at) + VALUES ($1, CURRENT_TIMESTAMP) + ON CONFLICT DO NOTHING "#, ) .bind(identity) - .bind(<&str>::from(UnprocessedStatus::New)) - .bind(eligibility_timestamp) .execute(&mut *conn) .await?; Ok(identity) } - #[instrument(skip(self), level = "debug")] - async fn insert_new_recovery( - self, - existing_commitment: &Hash, - new_commitment: &Hash, - ) -> Result<(), Error> { - let mut conn = self.acquire().await?; - - sqlx::query( - r#" - INSERT INTO recoveries (existing_commitment, new_commitment) - VALUES ($1, $2) - "#, - ) - .bind(existing_commitment) - .bind(new_commitment) - .execute(&mut *conn) - .await?; - - Ok(()) - } - #[instrument(skip(self), level = "debug")] async fn get_latest_deletion(self) -> Result { let mut conn = self.acquire().await?; @@ -580,47 +553,6 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(()) } - #[cfg(test)] - #[instrument(skip(self), level = "debug")] - async fn get_all_recoveries(self) -> Result, Error> { - let mut conn = self.acquire().await?; - - Ok( - sqlx::query_as::<_, RecoveryEntry>("SELECT * FROM recoveries") - .fetch_all(&mut *conn) - .await?, - ) - } - - #[instrument(skip(self, prev_commits), level = "debug")] - async fn delete_recoveries(self, prev_commits: I) -> Result, Error> - where - I: IntoIterator + Send, - T: Into, - { - let mut conn = self.acquire().await?; - - // TODO: upstream PgHasArrayType impl to ruint - let prev_commits = prev_commits - .into_iter() - .map(|c| c.into().to_be_bytes()) - .collect::>(); - - let res = sqlx::query_as::<_, RecoveryEntry>( - r#" - DELETE - FROM recoveries - WHERE existing_commitment = ANY($1) - RETURNING * - "#, - ) - .bind(&prev_commits) - .fetch_all(&mut *conn) - .await?; - - Ok(res) - } - /// Inserts a new deletion into the deletions table /// /// This method is idempotent and on conflict nothing will happen @@ -686,20 +618,15 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { } #[instrument(skip(self), level = "debug")] - async fn get_eligible_unprocessed_commitments( - self, - status: UnprocessedStatus, - ) -> Result, Error> { + async fn get_unprocessed_commitments(self) -> Result, Error> { let mut conn = self.acquire().await?; let result: Vec<(Hash,)> = sqlx::query_as( r#" SELECT commitment FROM unprocessed_identities - WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility LIMIT $2 "#, ) - .bind(<&str>::from(status)) .bind(MAX_UNPROCESSED_FETCH_COUNT) .fetch_all(&mut *conn) .await?; @@ -707,33 +634,6 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized { Ok(result.into_iter().map(|(commitment,)| commitment).collect()) } - /// Returns the error message from the unprocessed identities table - /// if it exists - /// - /// - The outer option represents the existence of the commitment in the - /// unprocessed_identities table - /// - The inner option represents the existence of an error message - #[instrument(skip(self), level = "debug")] - async fn get_unprocessed_error( - self, - commitment: &Hash, - ) -> Result>, Error> { - let mut conn = self.acquire().await?; - - let result: Option<(Option,)> = sqlx::query_as( - r#" - SELECT error_message - FROM unprocessed_identities - WHERE commitment = $1 - "#, - ) - .bind(commitment) - .fetch_optional(&mut *conn) - .await?; - - Ok(result.map(|(error_message,)| error_message)) - } - #[instrument(skip(self), level = "debug")] async fn remove_unprocessed_identity(self, commitment: &Hash) -> Result<(), Error> { let mut conn = self.acquire().await?; diff --git a/src/database/mod.rs b/src/database/mod.rs index 0e14359a..aae12d24 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -281,24 +281,11 @@ mod test { .expect("cant convert to u256") .into(); - let eligibility_timestamp = Utc::now(); - - let hash = db - .insert_new_identity(commit_hash, eligibility_timestamp) - .await?; + let hash = db.insert_unprocessed_identity(commit_hash).await?; assert_eq!(commit_hash, hash); - let commit = db - .get_unprocessed_error(&commit_hash) - .await - .expect("expected commitment status"); - assert_eq!(commit, Some(None)); - - let identity_count = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await? - .len(); + let identity_count = db.get_unprocessed_commitments().await?.len(); assert_eq!(identity_count, 1); @@ -318,8 +305,7 @@ mod test { let eligibility_timestamp = Utc::now(); for identity in &identities { - db.insert_new_identity(*identity, eligibility_timestamp) - .await?; + db.insert_unprocessed_identity(*identity).await?; } assert_eq!( @@ -390,7 +376,7 @@ mod test { } #[tokio::test] - async fn test_insert_prover_configuration() -> anyhow::Result<()> { + async fn insert_prover_configuration() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -433,7 +419,7 @@ mod test { } #[tokio::test] - async fn test_insert_provers() -> anyhow::Result<()> { + async fn insert_provers() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let mock_provers = mock_provers(); @@ -447,7 +433,7 @@ mod test { } #[tokio::test] - async fn test_remove_prover() -> anyhow::Result<()> { + async fn remove_prover() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let mock_provers = mock_provers(); @@ -465,27 +451,7 @@ mod test { } #[tokio::test] - async fn test_insert_new_recovery() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - - let existing_commitment: Uint<256, 4> = Uint::from(1); - let new_commitment: Uint<256, 4> = Uint::from(2); - - db.insert_new_recovery(&existing_commitment, &new_commitment) - .await?; - - let recoveries = db.get_all_recoveries().await?; - - assert_eq!(recoveries.len(), 1); - assert_eq!(recoveries[0].existing_commitment, existing_commitment); - assert_eq!(recoveries[0].new_commitment, new_commitment); - - Ok(()) - } - - #[tokio::test] - async fn test_insert_new_deletion() -> anyhow::Result<()> { + async fn insert_new_deletion() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let existing_commitment: Uint<256, 4> = Uint::from(1); @@ -501,26 +467,22 @@ mod test { } #[tokio::test] - async fn test_get_eligible_unprocessed_commitments() -> anyhow::Result<()> { + async fn get_eligible_unprocessed_commitments() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let commitment_0: Uint<256, 4> = Uint::from(1); let eligibility_timestamp_0 = Utc::now(); - db.insert_new_identity(commitment_0, eligibility_timestamp_0) - .await?; + db.insert_unprocessed_identity(commitment_0).await?; let commitment_1: Uint<256, 4> = Uint::from(2); let eligibility_timestamp_1 = Utc::now() .checked_add_days(Days::new(7)) .expect("Could not create eligibility timestamp"); - db.insert_new_identity(commitment_1, eligibility_timestamp_1) - .await?; + db.insert_unprocessed_identity(commitment_1).await?; - let unprocessed_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; + let unprocessed_commitments = db.get_unprocessed_commitments().await?; assert_eq!(unprocessed_commitments.len(), 1); assert_eq!(unprocessed_commitments[0], commitment_0); @@ -529,27 +491,23 @@ mod test { } #[tokio::test] - async fn test_get_unprocessed_commitments() -> anyhow::Result<()> { + async fn get_unprocessed_commitments() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; // Insert new identity with a valid eligibility timestamp let commitment_0: Uint<256, 4> = Uint::from(1); let eligibility_timestamp_0 = Utc::now(); - db.insert_new_identity(commitment_0, eligibility_timestamp_0) - .await?; + db.insert_unprocessed_identity(commitment_0).await?; // Insert new identity with eligibility timestamp in the future let commitment_1: Uint<256, 4> = Uint::from(2); let eligibility_timestamp_1 = Utc::now() .checked_add_days(Days::new(7)) .expect("Could not create eligibility timestamp"); - db.insert_new_identity(commitment_1, eligibility_timestamp_1) - .await?; + db.insert_unprocessed_identity(commitment_1).await?; - let unprocessed_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; + let unprocessed_commitments = db.get_unprocessed_commitments().await?; // Assert unprocessed commitments against expected values assert_eq!(unprocessed_commitments.len(), 1); @@ -559,49 +517,7 @@ mod test { } #[tokio::test] - async fn test_update_eligibility_timestamp() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let dec = "1234500000000000000"; - let commit_hash: Hash = U256::from_dec_str(dec) - .expect("cant convert to u256") - .into(); - - // Set eligibility to Utc::now() day and check db entries - let eligibility_timestamp = Utc::now(); - db.insert_new_identity(commit_hash, eligibility_timestamp) - .await?; - - let commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; - assert_eq!(commitments.len(), 1); - - let eligible_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; - assert_eq!(eligible_commitments.len(), 1); - - // Set eligibility to Utc::now() + 7 days and check db entries - let eligibility_timestamp = Utc::now() - .checked_add_days(Days::new(7)) - .expect("Could not create eligibility timestamp"); - - // Insert new identity with an eligibility timestamp in the future - let commit_hash: Hash = Hash::from(1); - db.insert_new_identity(commit_hash, eligibility_timestamp) - .await?; - - let eligible_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; - assert_eq!(eligible_commitments.len(), 1); - - Ok(()) - } - - #[tokio::test] - async fn test_insert_deletion() -> anyhow::Result<()> { + async fn insert_deletion() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let identities = mock_identities(3); @@ -617,47 +533,6 @@ mod test { Ok(()) } - #[tokio::test] - async fn test_insert_recovery() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - - let old_identities = mock_identities(3); - let new_identities = mock_identities(3); - - for (old, new) in old_identities.into_iter().zip(new_identities) { - db.insert_new_recovery(&old, &new).await?; - } - - let recoveries = db.get_all_recoveries().await?; - assert_eq!(recoveries.len(), 3); - - Ok(()) - } - - #[tokio::test] - async fn test_delete_recoveries() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - - let old_identities = mock_identities(3); - let new_identities = mock_identities(3); - - for (old, new) in old_identities.clone().into_iter().zip(new_identities) { - db.insert_new_recovery(&old, &new).await?; - } - - let deleted_recoveries = db - .delete_recoveries(old_identities[0..2].iter().cloned()) - .await?; - assert_eq!(deleted_recoveries.len(), 2); - - let remaining = db.get_all_recoveries().await?; - assert_eq!(remaining.len(), 1); - - Ok(()) - } - #[tokio::test] async fn get_last_leaf_index() -> anyhow::Result<()> { let docker = Cli::default(); @@ -1063,7 +938,7 @@ mod test { } #[tokio::test] - async fn test_root_invalidation() -> anyhow::Result<()> { + async fn root_invalidation() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -1155,7 +1030,7 @@ mod test { // When there's only unprocessed identity let eligibility_timestamp = Utc::now(); - db.insert_new_identity(identities[0], eligibility_timestamp) + db.insert_unprocessed_identity(identities[0], eligibility_timestamp) .await .context("Inserting new identity")?; assert!(db.identity_exists(identities[0]).await?); @@ -1171,7 +1046,7 @@ mod test { } #[tokio::test] - async fn test_remove_deletions() -> anyhow::Result<()> { + async fn remove_deletions() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -1203,7 +1078,7 @@ mod test { } #[tokio::test] - async fn test_latest_insertion() -> anyhow::Result<()> { + async fn latest_insertion() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -1231,7 +1106,7 @@ mod test { } #[tokio::test] - async fn test_latest_deletion() -> anyhow::Result<()> { + async fn latest_deletion() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; @@ -1280,7 +1155,7 @@ mod test { } #[tokio::test] - async fn test_insert_batch() -> anyhow::Result<()> { + async fn insert_batch() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let identities: Vec<_> = mock_identities(10) @@ -1308,7 +1183,7 @@ mod test { } #[tokio::test] - async fn test_get_next_batch() -> anyhow::Result<()> { + async fn get_next_batch() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let identities: Vec<_> = mock_identities(10) @@ -1352,7 +1227,7 @@ mod test { } #[tokio::test] - async fn test_get_next_batch_without_transaction() -> anyhow::Result<()> { + async fn get_next_batch_without_transaction() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let identities: Vec<_> = mock_identities(10) @@ -1400,7 +1275,7 @@ mod test { } #[tokio::test] - async fn test_get_batch_head() -> anyhow::Result<()> { + async fn get_batch_head() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let roots = mock_roots(1); @@ -1431,7 +1306,7 @@ mod test { } #[tokio::test] - async fn test_insert_transaction() -> anyhow::Result<()> { + async fn insert_transaction() -> anyhow::Result<()> { let docker = Cli::default(); let (db, _db_container) = setup_db(&docker).await?; let roots = mock_roots(1); diff --git a/src/identity/processor.rs b/src/identity/processor.rs index 0971abe1..2d8821ef 100644 --- a/src/identity/processor.rs +++ b/src/identity/processor.rs @@ -554,7 +554,7 @@ impl OnChainIdentityProcessor { // For each deletion, if there is a corresponding recovery, insert a new // identity with the specified eligibility timestamp for recovery in deleted_recoveries { - tx.insert_new_identity(recovery.new_commitment, eligibility_timestamp) + tx.insert_unprocessed_identity(recovery.new_commitment, eligibility_timestamp) .await?; } @@ -656,7 +656,7 @@ impl OffChainIdentityProcessor { // For each deletion, if there is a corresponding recovery, insert a new // identity with the specified eligibility timestamp for recovery in deleted_recoveries { - tx.insert_new_identity(recovery.new_commitment, eligibility_timestamp) + tx.insert_unprocessed_identity(recovery.new_commitment, eligibility_timestamp) .await?; } diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index 41916929..9674b84b 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -31,8 +31,9 @@ pub async fn insert_identities( // get commits from database let unprocessed = app .database - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) + .get_unprocessed_commitments(UnprocessedStatus::New) .await?; + if unprocessed.is_empty() { continue; } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4028af3e..18835722 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -479,56 +479,6 @@ pub async fn test_delete_identity( (ref_tree.proof(leaf_index).unwrap(), ref_tree.root()) } -#[instrument(skip_all)] -pub async fn test_recover_identity( - uri: &str, - client: &Client, - ref_tree: &mut PoseidonTree, - test_leaves: &[Field], - previous_leaf_index: usize, - new_leaf: Field, - new_leaf_index: usize, - expect_failure: bool, -) -> (merkle_tree::Proof, Field) { - let previous_leaf = test_leaves[previous_leaf_index]; - - let body = construct_recover_identity_body(&previous_leaf, &new_leaf); - - let response = client - .post(uri.to_owned() + "/recoverIdentity") - .header("Content-Type", "application/json") - .body(body) - .send() - .await - .expect("Failed to create insert identity"); - - let response_status = response.status(); - - let bytes = response - .bytes() - .await - .expect("Failed to get response bytes"); - - if expect_failure { - assert!(!response_status.is_success()); - } else { - assert!(response_status.is_success()); - assert!(bytes.is_empty()); - } - - // TODO: Note that recovery order is non-deterministic and therefore we cannot - // easily keep the ref_tree in sync with the sequencer's version of the - // tree. In the future, we could consider tracking updates to the tree in a - // different way like listening to event emission. - ref_tree.set(previous_leaf_index, Hash::ZERO); - // Continuing on the note above, while the replacement identity is be - // inserted as a new identity, it is not deterministic and if there are multiple - // recovery requests, it is possible that the sequencer tree is ordered in a - // different way than the ref_tree - ref_tree.set(new_leaf_index, new_leaf); - (ref_tree.proof(new_leaf_index).unwrap(), ref_tree.root()) -} - #[instrument(skip_all)] pub async fn test_add_batch_size( uri: impl Into,