Skip to content

Commit

Permalink
fetch batch size based on insertion or deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune committed Sep 25, 2023
1 parent 065884a commit d3d29a7
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 21 deletions.
17 changes: 0 additions & 17 deletions src/task_monitor/tasks/delete_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tracing::info;
use crate::database::types::DeletionEntry;
use crate::database::Database;
use crate::identity_tree::{Hash, Latest, TreeVersion};
use crate::prover::{ProverConfiguration, ProverType};

pub struct DeleteIdentities {
database: Arc<Database>,
Expand Down Expand Up @@ -76,22 +75,6 @@ async fn delete_identities(
// Dedup deletion entries
let deletions = deletions.into_iter().collect::<HashSet<DeletionEntry>>();

// Get the max batch size for deletion provers
let max_batch_size = database
.get_provers()
.await?
.iter()
.filter(|p| p.prover_type == ProverType::Deletion)
.max_by_key(|p| p.batch_size)
.map(|p| p.batch_size)
.expect("Could not get match batch size");

// Trim deletions the max deletion batch size available
let deletions = deletions
.into_iter()
.take(max_batch_size)
.collect::<HashSet<DeletionEntry>>();

let (leaf_indices, previous_commitments): (Vec<usize>, Vec<Hash>) = deletions
.iter()
.map(|d| (d.leaf_index, d.commitment))
Expand Down
18 changes: 14 additions & 4 deletions src/task_monitor/tasks/process_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ async fn process_identities(
identity_manager.await_clean_slate().await?;

info!("Starting identity processor.");
let insertion_batch_size = identity_manager.max_insertion_batch_size().await;
let deletion_batch_size = identity_manager.max_deletion_batch_size().await;
let batch_size = std::cmp::max(insertion_batch_size, deletion_batch_size);

// We start a timer and force it to perform one initial tick to avoid an
// immediate trigger.
Expand Down Expand Up @@ -103,9 +100,15 @@ async fn process_identities(
// If the timer has fired we want to insert whatever
// identities we have, even if it's not many. This ensures
// a minimum quality of service for API users.

let batch_size = if batching_tree.peek_next_updates(1)[0].update.element == Hash::ZERO{
identity_manager.max_deletion_batch_size().await
}else{
identity_manager.max_insertion_batch_size().await
};

let updates = batching_tree.peek_next_updates(batch_size);


if updates.is_empty() {
continue;
}
Expand Down Expand Up @@ -142,6 +145,13 @@ async fn process_identities(
let should_process_anyway =
timeout_secs.abs_diff(diff_secs) <= DEBOUNCE_THRESHOLD_SECS;


let batch_size = if batching_tree.peek_next_updates(1)[0].update.element == Hash::ZERO{
identity_manager.max_deletion_batch_size().await
}else{
identity_manager.max_insertion_batch_size().await
};

// We have _at most_ one complete batch here.
let updates = batching_tree.peek_next_updates(batch_size);
if updates.is_empty() {
Expand Down

0 comments on commit d3d29a7

Please sign in to comment.