diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index a2ca21a3..28642efe 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -168,40 +168,56 @@ async fn process_identities( // We have _at most_ one complete batch here. let updates = batching_tree.peek_next_updates(batch_size); - // If insertion, check if we have enough identities to insert or if the insertion interval has elapsed - if insertion{ - // If there are not enough identities to insert at this - // stage we can wait. The timer will ensure that the API - // clients do not wait too long for their submission to be - // completed. - if updates.len() < batch_size && !should_process_anyway { - // We do not reset the timer here as we may want to - // insert anyway soon. - tracing::trace!( - "Pending identities ({}) is less than batch size ({}). Waiting.", - updates.len(), - batch_size - ); - continue; + if insertion { + // If the batch size is full or if the insertion time has elapsed, process the batch + if updates.len() == batch_size || should_process_anyway { + commit_identities( + database, + identity_manager, + batching_tree, + monitored_txs_sender, + &updates, + ).await?; + + // We've inserted the identities, so we want to ensure that + // we don't trigger again until either we get a full batch + // or the timer ticks. + timer.reset(); + last_batch_time = Utc::now(); + database.update_latest_insertion_timestamp(last_batch_time).await?; + }else{ + // Check if the next batch after the current insertion batch is deletion. + // The only time that deletions are inserted is when there is a full deletion batch or the deletion time interval has elapsed. + // In this case, we should immediately process the batch. + let next_batch_is_deletion = if let Some(update) = batching_tree.peek_next_updates(batch_size + 1).last(){ + update.update.element == Hash::ZERO + }else{ + false + }; + + //If the next batch is deletion, process the current insertion batch + if next_batch_is_deletion { + commit_identities( + database, + identity_manager, + batching_tree, + monitored_txs_sender, + &updates, + ).await?; + + }else{ + // If there are not enough identities to fill the batch, the time interval has not elapsed and the next batch is not deletion, wait for more identities + tracing::trace!( + "Pending identities ({}) is less than batch size ({}). Waiting.", + updates.len(), + batch_size + ); + continue; + } } - - commit_identities( - database, - identity_manager, - batching_tree, - monitored_txs_sender, - &updates, - ).await?; - - // We've inserted the identities, so we want to ensure that - // we don't trigger again until either we get a full batch - // or the timer ticks. - timer.reset(); - last_batch_time = Utc::now(); - database.update_latest_insertion_timestamp(last_batch_time).await?; - }else{ + // If the next batch is a deletion batch, process immediately commit_identities( database, identity_manager,