From b373b2a69e257c9cc85adc38005ae2241e0f76c7 Mon Sep 17 00:00:00 2001 From: Piotr Heilman <1212808+piohei@users.noreply.github.com> Date: Wed, 29 May 2024 16:53:56 +0200 Subject: [PATCH] Remove /identityHistory endpoint. (#739) --- src/app.rs | 54 +------ src/database/mod.rs | 141 +---------------- src/database/transaction.rs | 93 +---------- src/database/types.rs | 12 +- src/server.rs | 18 +-- src/server/data.rs | 83 ---------- tests/identity_history.rs | 300 ------------------------------------ 7 files changed, 10 insertions(+), 691 deletions(-) delete mode 100644 tests/identity_history.rs diff --git a/src/app.rs b/src/app.rs index b6433e7f..3713145a 100644 --- a/src/app.rs +++ b/src/app.rs @@ -15,13 +15,12 @@ use crate::database::query::DatabaseQuery as _; use crate::database::Database; use crate::ethereum::Ethereum; use crate::identity_tree::{ - CanonicalTreeBuilder, Hash, InclusionProof, ProcessedStatus, RootItem, Status, TreeState, - TreeUpdate, TreeVersionReadOps, TreeWithNextVersion, UnprocessedStatus, + CanonicalTreeBuilder, Hash, InclusionProof, ProcessedStatus, RootItem, TreeState, TreeUpdate, + TreeVersionReadOps, TreeWithNextVersion, }; use crate::prover::map::initialize_prover_maps; use crate::prover::{ProverConfig, ProverType}; use crate::server::data::{ - IdentityHistoryEntry, IdentityHistoryEntryKind, IdentityHistoryEntryStatus, InclusionProofResponse, ListBatchSizesResponse, VerifySemaphoreProofQuery, VerifySemaphoreProofRequest, VerifySemaphoreProofResponse, }; @@ -540,55 +539,6 @@ impl App { .await } - pub async fn identity_history( - &self, - commitment: &Hash, - ) -> Result, ServerError> { - let entries = self - .database - .get_identity_history_entries(commitment) - .await?; - - let mut history = vec![]; - - for entry in entries { - let mut status = match entry.status { - Status::Processed(ProcessedStatus::Pending) => IdentityHistoryEntryStatus::Pending, - Status::Processed(ProcessedStatus::Processed) => IdentityHistoryEntryStatus::Mined, - Status::Processed(ProcessedStatus::Mined) => IdentityHistoryEntryStatus::Bridged, - Status::Unprocessed(UnprocessedStatus::New) => IdentityHistoryEntryStatus::Buffered, - }; - - match status { - // A pending identity can be present in the batching tree and therefore status - // should be set to Batched - IdentityHistoryEntryStatus::Pending => { - if let Some(leaf_index) = entry.leaf_index { - if self.tree_state()?.get_batching_tree().get_leaf(leaf_index) - == entry.commitment - { - status = IdentityHistoryEntryStatus::Batched; - } - } - } - IdentityHistoryEntryStatus::Buffered if entry.held_back => { - status = IdentityHistoryEntryStatus::Queued; - } - _ => (), - } - - let kind = if entry.commitment == Uint::ZERO { - IdentityHistoryEntryKind::Deletion - } else { - IdentityHistoryEntryKind::Insertion - }; - - history.push(IdentityHistoryEntry { kind, status }); - } - - Ok(history) - } - fn merge_env_provers( prover_urls: &[ProverConfig], existing_provers: &mut HashSet, diff --git a/src/database/mod.rs b/src/database/mod.rs index d1b68971..8aa30759 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -165,7 +165,7 @@ mod test { use crate::config::DatabaseConfig; use crate::database::query::DatabaseQuery; use crate::database::types::BatchType; - use crate::identity_tree::{Hash, ProcessedStatus, Status, UnprocessedStatus}; + use crate::identity_tree::{Hash, ProcessedStatus, UnprocessedStatus}; use crate::prover::identity::Identity; use crate::prover::{ProverConfig, ProverType}; use crate::utils::secret::SecretUrl; @@ -1175,145 +1175,6 @@ mod test { Ok(()) } - #[tokio::test] - async fn test_history_unprocessed_identities() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let identities = mock_identities(2); - - let now = Utc::now(); - - let insertion_timestamp = now - chrono::Duration::seconds(5); - db.insert_new_identity(identities[0], insertion_timestamp) - .await?; - - let insertion_timestamp = now + chrono::Duration::seconds(5); - db.insert_new_identity(identities[1], insertion_timestamp) - .await?; - - let history = db.get_identity_history_entries(&identities[0]).await?; - - assert_eq!(history.len(), 1); - assert_eq!( - history[0].status, - Status::Unprocessed(UnprocessedStatus::New) - ); - assert!(!history[0].held_back, "Identity should not be held back"); - assert_eq!(history[0].leaf_index, None); - - let history = db.get_identity_history_entries(&identities[1]).await?; - - assert_eq!(history.len(), 1); - assert_eq!( - history[0].status, - Status::Unprocessed(UnprocessedStatus::New) - ); - assert!(history[0].held_back, "Identity should be held back"); - assert_eq!(history[0].leaf_index, None); - - Ok(()) - } - - #[tokio::test] - async fn test_history_unprocessed_deletion_identities() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let identities = mock_identities(2); - let roots = mock_roots(2); - - db.insert_pending_identity(0, &identities[0], &roots[0]) - .await?; - db.mark_root_as_mined_tx(&roots[0]).await?; - - db.insert_new_deletion(0, &identities[0]).await?; - - let history = db.get_identity_history_entries(&identities[0]).await?; - - assert_eq!(history.len(), 2); - - assert_eq!(history[0].status, Status::Processed(ProcessedStatus::Mined)); - assert_eq!(history[0].commitment, identities[0]); - assert_eq!(history[0].leaf_index, Some(0)); - assert!(!history[0].held_back, "Identity should not be held back"); - - assert_eq!( - history[1].status, - Status::Unprocessed(UnprocessedStatus::New) - ); - assert_eq!(history[1].commitment, Hash::ZERO); - assert_eq!(history[1].leaf_index, Some(0)); - assert!(!history[1].held_back, "Identity should not be held back"); - - Ok(()) - } - - #[tokio::test] - async fn test_history_processed_deletion_identities() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let identities = mock_identities(2); - let roots = mock_roots(2); - - db.insert_pending_identity(0, &identities[0], &roots[0]) - .await?; - db.insert_pending_identity(0, &Hash::ZERO, &roots[1]) - .await?; - - db.mark_root_as_mined_tx(&roots[1]).await?; - - let history = db.get_identity_history_entries(&identities[0]).await?; - - assert_eq!(history.len(), 2); - - assert_eq!(history[0].status, Status::Processed(ProcessedStatus::Mined)); - assert_eq!(history[0].commitment, identities[0]); - assert_eq!(history[0].leaf_index, Some(0)); - assert!(!history[0].held_back, "Identity should not be held back"); - - assert_eq!(history[1].status, Status::Processed(ProcessedStatus::Mined)); - assert_eq!(history[1].commitment, Hash::ZERO); - assert_eq!(history[1].leaf_index, Some(0)); - assert!(!history[1].held_back, "Identity should not be held back"); - - Ok(()) - } - - #[tokio::test] - async fn test_history_processed_identity() -> anyhow::Result<()> { - let docker = Cli::default(); - let (db, _db_container) = setup_db(&docker).await?; - let identities = mock_identities(2); - let roots = mock_roots(2); - - db.insert_pending_identity(0, &identities[0], &roots[0]) - .await?; - - let history = db.get_identity_history_entries(&identities[0]).await?; - - assert_eq!(history.len(), 1); - - assert_eq!( - history[0].status, - Status::Processed(ProcessedStatus::Pending) - ); - assert_eq!(history[0].commitment, identities[0]); - assert_eq!(history[0].leaf_index, Some(0)); - assert!(!history[0].held_back, "Identity should not be held back"); - - db.mark_root_as_mined_tx(&roots[0]).await?; - - let history = db.get_identity_history_entries(&identities[0]).await?; - - assert_eq!(history.len(), 1); - - assert_eq!(history[0].status, Status::Processed(ProcessedStatus::Mined)); - assert_eq!(history[0].commitment, identities[0]); - assert_eq!(history[0].leaf_index, Some(0)); - assert!(!history[0].held_back, "Identity should not be held back"); - - Ok(()) - } - #[tokio::test] async fn can_insert_same_root_multiple_times() -> anyhow::Result<()> { let docker = Cli::default(); diff --git a/src/database/transaction.rs b/src/database/transaction.rs index ebd39756..222bdc43 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -1,11 +1,9 @@ -use chrono::{DateTime, Utc}; -use sqlx::{Executor, Row}; +use sqlx::Executor; use tracing::instrument; use crate::database::query::DatabaseQuery; -use crate::database::types::CommitmentHistoryEntry; use crate::database::{Database, Error}; -use crate::identity_tree::{Hash, ProcessedStatus, UnprocessedStatus}; +use crate::identity_tree::{Hash, ProcessedStatus}; use crate::utils::retry_tx; /// impl block for database transactions @@ -86,91 +84,4 @@ impl Database { }) .await } - - pub async fn get_identity_history_entries( - &self, - commitment: &Hash, - ) -> Result, Error> { - let unprocessed = sqlx::query( - r#" - SELECT commitment, status, eligibility - FROM unprocessed_identities - WHERE commitment = $1 - "#, - ) - .bind(commitment); - - let rows = self.pool.fetch_all(unprocessed).await?; - let unprocessed_updates = rows - .into_iter() - .map(|row| { - let eligibility_timestamp: DateTime = row.get(2); - let held_back = Utc::now() < eligibility_timestamp; - - CommitmentHistoryEntry { - leaf_index: None, - commitment: row.get::(0), - held_back, - status: row - .get::<&str, _>(1) - .parse() - .expect("Failed to parse unprocessed status"), - } - }) - .collect::>(); - - let leaf_index = self.get_identity_leaf_index(commitment).await?; - let Some(leaf_index) = leaf_index else { - return Ok(unprocessed_updates); - }; - - let identity_deletions = sqlx::query( - r#" - SELECT commitment - FROM deletions - WHERE leaf_index = $1 - "#, - ) - .bind(leaf_index.leaf_index as i64); - - let rows = self.pool.fetch_all(identity_deletions).await?; - let deletions = rows - .into_iter() - .map(|_row| CommitmentHistoryEntry { - leaf_index: Some(leaf_index.leaf_index), - commitment: Hash::ZERO, - held_back: false, - status: UnprocessedStatus::New.into(), - }) - .collect::>(); - - let processed_updates = sqlx::query( - r#" - SELECT commitment, status - FROM identities - WHERE leaf_index = $1 - ORDER BY id ASC - "#, - ) - .bind(leaf_index.leaf_index as i64); - - let rows = self.pool.fetch_all(processed_updates).await?; - let processed_updates: Vec = rows - .into_iter() - .map(|row| CommitmentHistoryEntry { - leaf_index: Some(leaf_index.leaf_index), - commitment: row.get::(0), - held_back: false, - status: row - .get::<&str, _>(1) - .parse() - .expect("Status is unreadable, database is corrupt"), - }) - .collect(); - - Ok([processed_updates, unprocessed_updates, deletions] - .concat() - .into_iter() - .collect()) - } } diff --git a/src/database/types.rs b/src/database/types.rs index 08239ef7..768f6be6 100644 --- a/src/database/types.rs +++ b/src/database/types.rs @@ -6,7 +6,7 @@ use sqlx::error::BoxDynError; use sqlx::prelude::FromRow; use sqlx::{Decode, Encode, Postgres, Type}; -use crate::identity_tree::{Hash, Status, UnprocessedStatus}; +use crate::identity_tree::{Hash, UnprocessedStatus}; use crate::prover::identity::Identity; pub struct UnprocessedCommitment { @@ -34,16 +34,6 @@ pub struct DeletionEntry { pub commitment: Hash, } -#[derive(Debug, Clone, Hash, PartialEq, Eq)] -pub struct CommitmentHistoryEntry { - pub commitment: Hash, - pub leaf_index: Option, - // Only applies to buffered entries - // set to true if the eligibility timestamp is in the future - pub held_back: bool, - pub status: Status, -} - #[derive(Debug, Copy, Clone, sqlx::Type, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] #[serde(rename_all = "camelCase")] #[sqlx(type_name = "VARCHAR", rename_all = "PascalCase")] diff --git a/src/server.rs b/src/server.rs index 2ab9f38e..6965eb22 100644 --- a/src/server.rs +++ b/src/server.rs @@ -22,10 +22,10 @@ mod custom_middleware; pub mod data; use self::data::{ - AddBatchSizeRequest, DeletionRequest, IdentityHistoryRequest, IdentityHistoryResponse, - InclusionProofRequest, InclusionProofResponse, InsertCommitmentRequest, ListBatchSizesResponse, - RecoveryRequest, RemoveBatchSizeRequest, ToResponseCode, VerifySemaphoreProofQuery, - VerifySemaphoreProofRequest, VerifySemaphoreProofResponse, + AddBatchSizeRequest, DeletionRequest, InclusionProofRequest, InclusionProofResponse, + InsertCommitmentRequest, ListBatchSizesResponse, RecoveryRequest, RemoveBatchSizeRequest, + ToResponseCode, VerifySemaphoreProofQuery, VerifySemaphoreProofRequest, + VerifySemaphoreProofResponse, }; async fn inclusion_proof( @@ -104,15 +104,6 @@ async fn recover_identity( Ok(()) } -async fn identity_history( - State(app): State>, - Json(req): Json, -) -> Result, Error> { - let history = app.identity_history(&req.identity_commitment).await?; - - Ok(Json(IdentityHistoryResponse { history })) -} - async fn remove_batch_size( State(app): State>, Json(req): Json, @@ -182,7 +173,6 @@ pub async fn bind_from_listener( .route("/insertIdentity", post(insert_identity)) .route("/deleteIdentity", post(delete_identity)) .route("/recoverIdentity", post(recover_identity)) - .route("/identityHistory", post(identity_history)) // Operate on batch sizes .route("/addBatchSize", post(add_batch_size)) .route("/removeBatchSize", post(remove_batch_size)) diff --git a/src/server/data.rs b/src/server/data.rs index 64b73be9..7e4ef702 100644 --- a/src/server/data.rs +++ b/src/server/data.rs @@ -20,47 +20,6 @@ pub struct ListBatchSizesResponse(pub Vec); #[serde(transparent)] pub struct VerifySemaphoreProofResponse(pub RootItem); -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -#[serde(deny_unknown_fields)] -pub struct IdentityHistoryResponse { - pub history: Vec, -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -#[serde(deny_unknown_fields)] -pub struct IdentityHistoryEntry { - pub kind: IdentityHistoryEntryKind, - pub status: IdentityHistoryEntryStatus, -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -#[serde(deny_unknown_fields)] -pub enum IdentityHistoryEntryKind { - Insertion, - Deletion, -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] -#[serde(rename_all = "camelCase")] -#[serde(deny_unknown_fields)] -pub enum IdentityHistoryEntryStatus { - // Present in the unprocessed identities or deletions table - Buffered, - // Present in the unprocessed identities table but not eligible for processing - Queued, - // Present in the pending tree (not mined on chain yet) - Pending, - // Present in the batching tree (transaction sent but not confirmed yet) - Batched, - // Present in the processed tree (mined on chain) - Mined, - // Present in the batching tree (mined on chain) - Bridged, -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(deny_unknown_fields)] @@ -99,13 +58,6 @@ pub struct InclusionProofRequest { pub identity_commitment: Hash, } -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -#[serde(deny_unknown_fields)] -pub struct IdentityHistoryRequest { - pub identity_commitment: Hash, -} - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(deny_unknown_fields)] @@ -205,18 +157,6 @@ impl ToResponseCode for VerifySemaphoreProofResponse { } } -impl IdentityHistoryEntryKind { - #[must_use] - pub fn is_insertion(&self) -> bool { - matches!(self, IdentityHistoryEntryKind::Insertion) - } - - #[must_use] - pub fn is_deletion(&self) -> bool { - matches!(self, IdentityHistoryEntryKind::Deletion) - } -} - pub trait ToResponseCode { fn to_response_code(&self) -> StatusCode; } @@ -226,26 +166,3 @@ impl ToResponseCode for () { StatusCode::OK } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn identity_history_entry_status_ordering() { - let expected = vec![ - IdentityHistoryEntryStatus::Buffered, - IdentityHistoryEntryStatus::Queued, - IdentityHistoryEntryStatus::Pending, - IdentityHistoryEntryStatus::Batched, - IdentityHistoryEntryStatus::Mined, - IdentityHistoryEntryStatus::Bridged, - ]; - - let mut statuses = expected.clone(); - - statuses.sort(); - - assert_eq!(expected, statuses); - } -} diff --git a/tests/identity_history.rs b/tests/identity_history.rs deleted file mode 100644 index d552ef5a..00000000 --- a/tests/identity_history.rs +++ /dev/null @@ -1,300 +0,0 @@ -mod common; - -use common::prelude::*; -use hyper::StatusCode; -use signup_sequencer::server::data::{ - IdentityHistoryEntryStatus, IdentityHistoryRequest, IdentityHistoryResponse, -}; - -use crate::common::test_recover_identity; - -const HISTORY_POLLING_SLEEP: Duration = Duration::from_secs(5); -const MAX_HISTORY_POLLING_ATTEMPTS: usize = 24; // 2 minutes - -#[tokio::test] -async fn identity_history() -> anyhow::Result<()> { - // Initialize logging for the test. - init_tracing_subscriber(); - info!("Starting integration test"); - - let insertion_batch_size: usize = 8; - let deletion_batch_size: usize = 3; - - let mut ref_tree = PoseidonTree::new(DEFAULT_TREE_DEPTH + 1, ruint::Uint::ZERO); - let initial_root: U256 = ref_tree.root().into(); - - let docker = Cli::default(); - let (mock_chain, db_container, insertion_prover_map, deletion_prover_map, micro_oz) = - spawn_deps( - initial_root, - &[insertion_batch_size], - &[deletion_batch_size], - DEFAULT_TREE_DEPTH as u8, - &docker, - ) - .await?; - - // Set the root history expirty to 30 seconds - let updated_root_history_expiry = U256::from(30); - mock_chain - .identity_manager - .method::<_, ()>("setRootHistoryExpiry", updated_root_history_expiry)? - .send() - .await? - .await?; - - let mock_insertion_prover = &insertion_prover_map[&insertion_batch_size]; - let mock_deletion_prover = &deletion_prover_map[&deletion_batch_size]; - - let db_socket_addr = db_container.address(); - let db_url = format!("postgres://postgres:postgres@{db_socket_addr}/database"); - - let temp_dir = tempfile::tempdir()?; - info!( - "temp dir created at: {:?}", - temp_dir.path().join("testfile") - ); - - let config = TestConfigBuilder::new() - .db_url(&db_url) - .oz_api_url(µ_oz.endpoint()) - .oz_address(micro_oz.address()) - .identity_manager_address(mock_chain.identity_manager.address()) - .primary_network_provider(mock_chain.anvil.endpoint()) - .cache_file(temp_dir.path().join("testfile").to_str().unwrap()) - .add_prover(mock_insertion_prover) - .add_prover(mock_deletion_prover) - .build()?; - - let (_, app_handle, local_addr) = spawn_app(config).await.expect("Failed to spawn app."); - - let test_identities = generate_test_identities(insertion_batch_size * 3); - let identities_ref: Vec = test_identities - .iter() - .map(|i| Hash::from_str_radix(i, 16).unwrap()) - .collect(); - - let uri = "http://".to_owned() + &local_addr.to_string(); - let client = Client::new(); - - let mut next_leaf_index = 0; - // Insert enough identities to trigger an batch to be sent to the blockchain. - for i in 0..insertion_batch_size { - test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, i).await; - - next_leaf_index += 1; - } - - poll_history_until( - &uri, - &client, - &identities_ref[0], - |history| { - history.history[0].kind.is_insertion() - && history.history[0].status >= IdentityHistoryEntryStatus::Buffered - }, - "Polling until first insertion is buffered", - ) - .await?; - - poll_history_until( - &uri, - &client, - &identities_ref[0], - |history| { - history.history[0].kind.is_insertion() - && history.history[0].status >= IdentityHistoryEntryStatus::Pending - }, - "Polling until first insertion is pending", - ) - .await?; - - poll_history_until( - &uri, - &client, - &identities_ref[0], - |history| { - history.history[0].kind.is_insertion() - && history.history[0].status >= IdentityHistoryEntryStatus::Batched - }, - "Polling until first insertion is batched", - ) - .await?; - - poll_history_until( - &uri, - &client, - &identities_ref[0], - |history| { - history.history[0].kind.is_insertion() - && history.history[0].status >= IdentityHistoryEntryStatus::Bridged - }, - "Polling until first insertion is mined/bridged", - ) - .await?; - - // Insert enough recoveries to trigger a batch - for i in 0..deletion_batch_size { - // Delete the identity at i and replace it with an identity at the back of the - // test identities array - // TODO: we should update to a much cleaner approach - let recovery_leaf_index = test_identities.len() - i - 1; - - test_recover_identity( - &uri, - &client, - &mut ref_tree, - &identities_ref, - i, - identities_ref[recovery_leaf_index], - next_leaf_index, - false, - ) - .await; - - next_leaf_index += 1; - } - - let sample_recovery_identity = test_identities.len() - 1; - - tracing::info!("############ Deletion should be buffered ############"); - poll_history_until( - &uri, - &client, - &identities_ref[0], - |history| { - history.history[0].kind.is_insertion() - && history.history[0].status >= IdentityHistoryEntryStatus::Bridged - && history.history[1].kind.is_deletion() - && history.history[1].status >= IdentityHistoryEntryStatus::Buffered - }, - "Polling until first deletion is buffered", - ) - .await?; - - tracing::info!("############ Recovery should be queued ############"); - poll_history_until( - &uri, - &client, - &identities_ref[sample_recovery_identity], - |history| { - !history.history.is_empty() - && history.history[0].kind.is_insertion() - && history.history[0].status >= IdentityHistoryEntryStatus::Queued - }, - "Polling until first recovery is queued", - ) - .await?; - - // Eventually the old identity will be deleted - tracing::info!("############ Waiting for deletion to be mined/bridged ############"); - poll_history_until( - &uri, - &client, - &identities_ref[0], - |history| { - history.history[0].kind.is_insertion() - && history.history[0].status >= IdentityHistoryEntryStatus::Bridged - && history.history[1].kind.is_deletion() - && history.history[1].status >= IdentityHistoryEntryStatus::Bridged - }, - "Polling until first deletion is mined/bridged", - ) - .await?; - - // Sleep for root expiry - tokio::time::sleep(Duration::from_secs(updated_root_history_expiry.as_u64())).await; - - // Insert enough identities to trigger an batch to be sent to the blockchain. - for i in insertion_batch_size..insertion_batch_size * 2 { - test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, i).await; - next_leaf_index += 1; - } - - tracing::info!("############ Final wait ############"); - poll_history_until( - &uri, - &client, - &identities_ref[sample_recovery_identity], - |history| { - history.history[0].kind.is_insertion() - && history.history[0].status >= IdentityHistoryEntryStatus::Bridged - }, - "Polling until recovery is mined/bridged", - ) - .await?; - - // Shutdown the app properly for the final time - shutdown(); - app_handle.await.unwrap(); - for (_, prover) in insertion_prover_map.into_iter() { - prover.stop(); - } - for (_, prover) in deletion_prover_map.into_iter() { - prover.stop(); - } - reset_shutdown(); - - Ok(()) -} - -async fn poll_history_until( - uri: &str, - client: &Client, - identity: &Field, - predicate: impl Fn(&IdentityHistoryResponse) -> bool, - label: &str, -) -> anyhow::Result<()> { - for _ in 0..MAX_HISTORY_POLLING_ATTEMPTS { - if let Some(history) = fetch_identity_history(uri, client, identity, label).await? { - if predicate(&history) { - return Ok(()); - } else { - tracing::warn!("Label {label} - history {history:?} does not match predicate"); - } - } else { - tracing::warn!("No identity history for label {label}"); - } - - tokio::time::sleep(HISTORY_POLLING_SLEEP).await; - } - - anyhow::bail!("Failed to fetch identity history within max attempts - {label}") -} - -async fn fetch_identity_history( - uri: &str, - client: &Client, - identity: &Field, - label: &str, -) -> anyhow::Result> { - let uri = format!("{uri}/identityHistory"); - let body = IdentityHistoryRequest { - identity_commitment: *identity, - }; - - let req = Request::post(uri) - .header("Content-Type", "application/json") - .body(Body::from(serde_json::to_string(&body)?))?; - - let mut response = client.request(req).await?; - - match response.status() { - StatusCode::NOT_FOUND => return Ok(None), - otherwise if otherwise.is_success() => { - // continue - } - status => { - anyhow::bail!("Failed to fetch identity history - status was {status} - label {label}") - } - } - - let body_bytes = hyper::body::to_bytes(response.body_mut()).await?; - let body_bytes = body_bytes.to_vec(); - - let body_string = String::from_utf8(body_bytes)?; - - let response: IdentityHistoryResponse = serde_json::from_str(&body_string)?; - - Ok(Some(response)) -}