From 1616e6cffbd6ad977902a7a930222caddc13da22 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Mon, 25 Sep 2023 17:14:18 -0400 Subject: [PATCH 1/4] added logic to trim deletions to max deletion batch size, added logic to configure batch size during process identities --- src/contracts/mod.rs | 6 +++++- src/task_monitor/tasks/delete_identities.rs | 20 +++++++++++++++++--- src/task_monitor/tasks/process_identities.rs | 7 ++++++- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/contracts/mod.rs b/src/contracts/mod.rs index 84737307..c451eade 100644 --- a/src/contracts/mod.rs +++ b/src/contracts/mod.rs @@ -146,10 +146,14 @@ impl IdentityManager { self.tree_depth } - pub async fn max_batch_size(&self) -> usize { + pub async fn max_insertion_batch_size(&self) -> usize { self.insertion_prover_map.read().await.max_batch_size() } + pub async fn max_deletion_batch_size(&self) -> usize { + self.deletion_prover_map.read().await.max_batch_size() + } + #[must_use] pub const fn initial_leaf_value(&self) -> Field { self.initial_leaf_value diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index 83b8a984..53905eeb 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -9,6 +9,7 @@ use tracing::info; use crate::database::types::DeletionEntry; use crate::database::Database; use crate::identity_tree::{Hash, Latest, TreeVersion}; +use crate::prover::{ProverConfiguration, ProverType}; pub struct DeleteIdentities { database: Arc, @@ -61,9 +62,6 @@ async fn delete_identities( loop { let deletions = database.get_deletions().await?; if deletions.is_empty() { - // Sleep for one hour - // TODO: should we make this dynamic? This causes an issue with tests so its set - // to 1 sec atm tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; continue; } @@ -78,6 +76,22 @@ async fn delete_identities( // Dedup deletion entries let deletions = deletions.into_iter().collect::>(); + // Get the max batch size for deletion provers + let max_batch_size = database + .get_provers() + .await? + .iter() + .filter(|p| p.prover_type == ProverType::Deletion) + .max_by_key(|p| p.batch_size) + .map(|p| p.batch_size) + .expect("Could not get match batch size"); + + // Trim deletions the max deletion batch size available + let deletions = deletions + .into_iter() + .take(max_batch_size) + .collect::>(); + let (leaf_indices, previous_commitments): (Vec, Vec) = deletions .iter() .map(|d| (d.leaf_index, d.commitment)) diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index f9e57460..85834477 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -72,7 +72,9 @@ async fn process_identities( identity_manager.await_clean_slate().await?; info!("Starting identity processor."); - let batch_size = identity_manager.max_batch_size().await; + let insertion_batch_size = identity_manager.max_insertion_batch_size().await; + let deletion_batch_size = identity_manager.max_deletion_batch_size().await; + let batch_size = std::cmp::max(insertion_batch_size, deletion_batch_size); // We start a timer and force it to perform one initial tick to avoid an // immediate trigger. @@ -101,6 +103,9 @@ async fn process_identities( // If the timer has fired we want to insert whatever // identities we have, even if it's not many. This ensures // a minimum quality of service for API users. + + batching_tree. + let updates = batching_tree.peek_next_updates(batch_size); if updates.is_empty() { continue; From 065884ad6ce6618ec2fd6740ba48e28fcbdd2925 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Mon, 25 Sep 2023 17:19:41 -0400 Subject: [PATCH 2/4] fix: compilation error --- src/task_monitor/tasks/process_identities.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index 85834477..9e49868e 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -104,7 +104,6 @@ async fn process_identities( // identities we have, even if it's not many. This ensures // a minimum quality of service for API users. - batching_tree. let updates = batching_tree.peek_next_updates(batch_size); if updates.is_empty() { From d3d29a7791ccd770c28def652021b143c638535c Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Mon, 25 Sep 2023 18:03:44 -0400 Subject: [PATCH 3/4] fetch batch size based on insertion or deletion --- src/task_monitor/tasks/delete_identities.rs | 17 ----------------- src/task_monitor/tasks/process_identities.rs | 18 ++++++++++++++---- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index 53905eeb..50b63f8d 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -9,7 +9,6 @@ use tracing::info; use crate::database::types::DeletionEntry; use crate::database::Database; use crate::identity_tree::{Hash, Latest, TreeVersion}; -use crate::prover::{ProverConfiguration, ProverType}; pub struct DeleteIdentities { database: Arc, @@ -76,22 +75,6 @@ async fn delete_identities( // Dedup deletion entries let deletions = deletions.into_iter().collect::>(); - // Get the max batch size for deletion provers - let max_batch_size = database - .get_provers() - .await? - .iter() - .filter(|p| p.prover_type == ProverType::Deletion) - .max_by_key(|p| p.batch_size) - .map(|p| p.batch_size) - .expect("Could not get match batch size"); - - // Trim deletions the max deletion batch size available - let deletions = deletions - .into_iter() - .take(max_batch_size) - .collect::>(); - let (leaf_indices, previous_commitments): (Vec, Vec) = deletions .iter() .map(|d| (d.leaf_index, d.commitment)) diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index 9e49868e..90e96aae 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -72,9 +72,6 @@ async fn process_identities( identity_manager.await_clean_slate().await?; info!("Starting identity processor."); - let insertion_batch_size = identity_manager.max_insertion_batch_size().await; - let deletion_batch_size = identity_manager.max_deletion_batch_size().await; - let batch_size = std::cmp::max(insertion_batch_size, deletion_batch_size); // We start a timer and force it to perform one initial tick to avoid an // immediate trigger. @@ -103,9 +100,15 @@ async fn process_identities( // If the timer has fired we want to insert whatever // identities we have, even if it's not many. This ensures // a minimum quality of service for API users. - + let batch_size = if batching_tree.peek_next_updates(1)[0].update.element == Hash::ZERO{ + identity_manager.max_deletion_batch_size().await + }else{ + identity_manager.max_insertion_batch_size().await + }; let updates = batching_tree.peek_next_updates(batch_size); + + if updates.is_empty() { continue; } @@ -142,6 +145,13 @@ async fn process_identities( let should_process_anyway = timeout_secs.abs_diff(diff_secs) <= DEBOUNCE_THRESHOLD_SECS; + + let batch_size = if batching_tree.peek_next_updates(1)[0].update.element == Hash::ZERO{ + identity_manager.max_deletion_batch_size().await + }else{ + identity_manager.max_insertion_batch_size().await + }; + // We have _at most_ one complete batch here. let updates = batching_tree.peek_next_updates(batch_size); if updates.is_empty() { From 8712debdda57e5f435b34f92f7e889ac874d440e Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Mon, 25 Sep 2023 18:04:52 -0400 Subject: [PATCH 4/4] formatting --- src/task_monitor/tasks/process_identities.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index 90e96aae..aecef1ee 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -107,8 +107,6 @@ async fn process_identities( }; let updates = batching_tree.peek_next_updates(batch_size); - - if updates.is_empty() { continue; } @@ -145,7 +143,6 @@ async fn process_identities( let should_process_anyway = timeout_secs.abs_diff(diff_secs) <= DEBOUNCE_THRESHOLD_SECS; - let batch_size = if batching_tree.peek_next_updates(1)[0].update.element == Hash::ZERO{ identity_manager.max_deletion_batch_size().await }else{