diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index e3a5351c..6c5c8bcb 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -62,6 +62,7 @@ pub async fn process_identities( _ = timer.tick() => { tracing::info!("Identity batch insertion woken due to timeout"); } + () = wake_up_notify.notified() => { tracing::trace!("Identity batch insertion woken due to request"); }, @@ -82,40 +83,83 @@ pub async fn process_identities( .batching_tree() .peek_next_updates(batch_size); - let current_time = Utc::now(); - let batch_insertion_timeout = - chrono::Duration::from_std(app.config.app.batch_insertion_timeout)?; - - let timeout_batch_time = last_batch_time - + batch_insertion_timeout - + chrono::Duration::seconds(DEBOUNCE_THRESHOLD_SECS); - - let can_skip_batch = current_time < timeout_batch_time; - - if updates.len() < batch_size && can_skip_batch { - tracing::trace!( - num_updates = updates.len(), - batch_size, - ?last_batch_time, - "Pending identities is less than batch size, skipping batch", - ); - - continue; - } - - commit_identities( - &app.identity_manager, - app.tree_state()?.batching_tree(), - &monitored_txs_sender, - &updates, - ) - .await?; - - timer.reset(); - last_batch_time = Utc::now(); - app.database - .update_latest_insertion_timestamp(last_batch_time) + // If the batch is a deletion, process immediately without resetting the timer + if batch_type.is_deletion() { + commit_identities( + &app.identity_manager, + app.tree_state()?.batching_tree(), + &monitored_txs_sender, + &updates, + ) .await?; + } else { + let current_time = Utc::now(); + let batch_insertion_timeout = + chrono::Duration::from_std(app.config.app.batch_insertion_timeout)?; + + let timeout_batch_time = last_batch_time + + batch_insertion_timeout + + chrono::Duration::seconds(DEBOUNCE_THRESHOLD_SECS); + + let batch_time_elapsed = current_time >= timeout_batch_time; + + // If the batch size is full or if the insertion time has elapsed + // process the batch + if updates.len() >= batch_size || batch_time_elapsed { + commit_identities( + &app.identity_manager, + app.tree_state()?.batching_tree(), + &monitored_txs_sender, + &updates, + ) + .await?; + + // We've inserted the identities, so we want to ensure that + // we don't trigger again until either we get a full batch + // or the timer ticks. + timer.reset(); + last_batch_time = Utc::now(); + app.database + .update_latest_insertion_timestamp(last_batch_time) + .await?; + } else { + // Check if the next batch after the current insertion batch is + // deletion. The only time that deletions are + // inserted is when there is a full deletion batch or the + // deletion time interval has elapsed. + // In this case, we should immediately process the batch. + let next_batch_is_deletion = if let Some(update) = app + .tree_state()? + .batching_tree() + .peek_next_updates(batch_size + 1) + .last() + { + update.update.element == Hash::ZERO + } else { + false + }; + + // If the next batch is deletion, process the current insertion batch + if next_batch_is_deletion { + commit_identities( + &app.identity_manager, + app.tree_state()?.batching_tree(), + &monitored_txs_sender, + &updates, + ) + .await?; + } else { + // If there are not enough identities to fill the batch, the time interval has + // not elapsed and the next batch is not deletion, wait for more identities + tracing::trace!( + "Pending identities ({}) is less than batch size ({}). Waiting.", + updates.len(), + batch_size + ); + continue; + } + } + } // We want to check if there's a full batch available immediately wake_up_notify.notify_one();