Skip to content

Commit

Permalink
added logic to trim deletions to max deletion batch size, added logic…
Browse files Browse the repository at this point in the history
… to configure batch size during process identities
  • Loading branch information
0xKitsune committed Sep 25, 2023
1 parent c8159a9 commit 1616e6c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 5 deletions.
6 changes: 5 additions & 1 deletion src/contracts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,14 @@ impl IdentityManager {
self.tree_depth
}

pub async fn max_batch_size(&self) -> usize {
pub async fn max_insertion_batch_size(&self) -> usize {
self.insertion_prover_map.read().await.max_batch_size()
}

pub async fn max_deletion_batch_size(&self) -> usize {
self.deletion_prover_map.read().await.max_batch_size()
}

#[must_use]
pub const fn initial_leaf_value(&self) -> Field {
self.initial_leaf_value
Expand Down
20 changes: 17 additions & 3 deletions src/task_monitor/tasks/delete_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tracing::info;
use crate::database::types::DeletionEntry;
use crate::database::Database;
use crate::identity_tree::{Hash, Latest, TreeVersion};
use crate::prover::{ProverConfiguration, ProverType};

Check warning on line 12 in src/task_monitor/tasks/delete_identities.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `ProverConfiguration`

warning: unused import: `ProverConfiguration` --> src/task_monitor/tasks/delete_identities.rs:12:21 | 12 | use crate::prover::{ProverConfiguration, ProverType}; | ^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default

Check warning on line 12 in src/task_monitor/tasks/delete_identities.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `ProverConfiguration`

warning: unused import: `ProverConfiguration` --> src/task_monitor/tasks/delete_identities.rs:12:21 | 12 | use crate::prover::{ProverConfiguration, ProverType}; | ^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default

pub struct DeleteIdentities {
database: Arc<Database>,
Expand Down Expand Up @@ -61,9 +62,6 @@ async fn delete_identities(
loop {
let deletions = database.get_deletions().await?;
if deletions.is_empty() {
// Sleep for one hour
// TODO: should we make this dynamic? This causes an issue with tests so its set
// to 1 sec atm
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
continue;
}
Expand All @@ -78,6 +76,22 @@ async fn delete_identities(
// Dedup deletion entries
let deletions = deletions.into_iter().collect::<HashSet<DeletionEntry>>();

// Get the max batch size for deletion provers
let max_batch_size = database
.get_provers()
.await?
.iter()
.filter(|p| p.prover_type == ProverType::Deletion)
.max_by_key(|p| p.batch_size)
.map(|p| p.batch_size)
.expect("Could not get match batch size");

// Trim deletions the max deletion batch size available
let deletions = deletions
.into_iter()
.take(max_batch_size)
.collect::<HashSet<DeletionEntry>>();

let (leaf_indices, previous_commitments): (Vec<usize>, Vec<Hash>) = deletions
.iter()
.map(|d| (d.leaf_index, d.commitment))
Expand Down
7 changes: 6 additions & 1 deletion src/task_monitor/tasks/process_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ async fn process_identities(
identity_manager.await_clean_slate().await?;

info!("Starting identity processor.");
let batch_size = identity_manager.max_batch_size().await;
let insertion_batch_size = identity_manager.max_insertion_batch_size().await;
let deletion_batch_size = identity_manager.max_deletion_batch_size().await;
let batch_size = std::cmp::max(insertion_batch_size, deletion_batch_size);

// We start a timer and force it to perform one initial tick to avoid an
// immediate trigger.
Expand Down Expand Up @@ -101,6 +103,9 @@ async fn process_identities(
// If the timer has fired we want to insert whatever
// identities we have, even if it's not many. This ensures
// a minimum quality of service for API users.

batching_tree.

let updates = batching_tree.peek_next_updates(batch_size);
if updates.is_empty() {
continue;
Expand Down

0 comments on commit 1616e6c

Please sign in to comment.