From d3d29a7791ccd770c28def652021b143c638535c Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Mon, 25 Sep 2023 18:03:44 -0400 Subject: [PATCH] fetch batch size based on insertion or deletion --- src/task_monitor/tasks/delete_identities.rs | 17 ----------------- src/task_monitor/tasks/process_identities.rs | 18 ++++++++++++++---- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index 53905eeb..50b63f8d 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -9,7 +9,6 @@ use tracing::info; use crate::database::types::DeletionEntry; use crate::database::Database; use crate::identity_tree::{Hash, Latest, TreeVersion}; -use crate::prover::{ProverConfiguration, ProverType}; pub struct DeleteIdentities { database: Arc<Database>, @@ -76,22 +75,6 @@ async fn delete_identities( // Dedup deletion entries let deletions = deletions.into_iter().collect::<HashSet<DeletionEntry>>(); - // Get the max batch size for deletion provers - let max_batch_size = database - .get_provers() - .await? - .iter() - .filter(|p| p.prover_type == ProverType::Deletion) - .max_by_key(|p| p.batch_size) - .map(|p| p.batch_size) - .expect("Could not get match batch size"); - - // Trim deletions the max deletion batch size available - let deletions = deletions - .into_iter() - .take(max_batch_size) - .collect::<HashSet<DeletionEntry>>(); - let (leaf_indices, previous_commitments): (Vec<usize>, Vec<Hash>) = deletions .iter() .map(|d| (d.leaf_index, d.commitment)) diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index 9e49868e..90e96aae 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -72,9 +72,6 @@ async fn process_identities( identity_manager.await_clean_slate().await?; info!("Starting identity processor."); - let insertion_batch_size = identity_manager.max_insertion_batch_size().await; - let deletion_batch_size = identity_manager.max_deletion_batch_size().await; - let batch_size = std::cmp::max(insertion_batch_size, deletion_batch_size); // We start a timer and force it to perform one initial tick to avoid an // immediate trigger. @@ -103,9 +100,15 @@ async fn process_identities( // If the timer has fired we want to insert whatever // identities we have, even if it's not many. This ensures // a minimum quality of service for API users. - + let batch_size = if batching_tree.peek_next_updates(1)[0].update.element == Hash::ZERO{ + identity_manager.max_deletion_batch_size().await + }else{ + identity_manager.max_insertion_batch_size().await + }; let updates = batching_tree.peek_next_updates(batch_size); + + if updates.is_empty() { continue; } @@ -142,6 +145,13 @@ async fn process_identities( let should_process_anyway = timeout_secs.abs_diff(diff_secs) <= DEBOUNCE_THRESHOLD_SECS; + + let batch_size = if batching_tree.peek_next_updates(1)[0].update.element == Hash::ZERO{ + identity_manager.max_deletion_batch_size().await + }else{ + identity_manager.max_insertion_batch_size().await + }; + // We have _at most_ one complete batch here. let updates = batching_tree.peek_next_updates(batch_size); if updates.is_empty() {