From b9c4461b2ef4d7828f535b0a82c545e0645e7a53 Mon Sep 17 00:00:00 2001
From: 0xKitsune <0xKitsune@protonmail.com>
Date: Mon, 22 Apr 2024 12:27:40 -0400
Subject: [PATCH] process deletions immediately, check if next batch is
 deletion

---
 src/task_monitor/tasks/process_identities.rs | 110 +++++++++++++------
 1 file changed, 77 insertions(+), 33 deletions(-)

diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs
index e3a5351c..6c5c8bcb 100644
--- a/src/task_monitor/tasks/process_identities.rs
+++ b/src/task_monitor/tasks/process_identities.rs
@@ -62,6 +62,7 @@ pub async fn process_identities(
             _ = timer.tick() => {
                 tracing::info!("Identity batch insertion woken due to timeout");
             }
+
             () = wake_up_notify.notified() => {
                 tracing::trace!("Identity batch insertion woken due to request");
             },
@@ -82,40 +83,83 @@ pub async fn process_identities(
             .batching_tree()
             .peek_next_updates(batch_size);
 
-        let current_time = Utc::now();
-        let batch_insertion_timeout =
-            chrono::Duration::from_std(app.config.app.batch_insertion_timeout)?;
-
-        let timeout_batch_time = last_batch_time
-            + batch_insertion_timeout
-            + chrono::Duration::seconds(DEBOUNCE_THRESHOLD_SECS);
-
-        let can_skip_batch = current_time < timeout_batch_time;
-
-        if updates.len() < batch_size && can_skip_batch {
-            tracing::trace!(
-                num_updates = updates.len(),
-                batch_size,
-                ?last_batch_time,
-                "Pending identities is less than batch size, skipping batch",
-            );
-
-            continue;
-        }
-
-        commit_identities(
-            &app.identity_manager,
-            app.tree_state()?.batching_tree(),
-            &monitored_txs_sender,
-            &updates,
-        )
-        .await?;
-
-        timer.reset();
-        last_batch_time = Utc::now();
-        app.database
-            .update_latest_insertion_timestamp(last_batch_time)
+        // If the batch is a deletion, process immediately without resetting the timer
+        if batch_type.is_deletion() {
+            commit_identities(
+                &app.identity_manager,
+                app.tree_state()?.batching_tree(),
+                &monitored_txs_sender,
+                &updates,
+            )
             .await?;
+        } else {
+            let current_time = Utc::now();
+            let batch_insertion_timeout =
+                chrono::Duration::from_std(app.config.app.batch_insertion_timeout)?;
+
+            let timeout_batch_time = last_batch_time
+                + batch_insertion_timeout
+                + chrono::Duration::seconds(DEBOUNCE_THRESHOLD_SECS);
+
+            let batch_time_elapsed = current_time >= timeout_batch_time;
+
+            // If the batch size is full or if the insertion time has elapsed
+            // process the batch
+            if updates.len() >= batch_size || batch_time_elapsed {
+                commit_identities(
+                    &app.identity_manager,
+                    app.tree_state()?.batching_tree(),
+                    &monitored_txs_sender,
+                    &updates,
+                )
+                .await?;
+
+                // We've inserted the identities, so we want to ensure that
+                // we don't trigger again until either we get a full batch
+                // or the timer ticks.
+                timer.reset();
+                last_batch_time = Utc::now();
+                app.database
+                    .update_latest_insertion_timestamp(last_batch_time)
+                    .await?;
+            } else {
+                // Check if the next batch after the current insertion batch is
+                // deletion. The only time that deletions are
+                // inserted is when there is a full deletion batch or the
+                // deletion time interval has elapsed.
+                // In this case, we should immediately process the batch.
+                let next_batch_is_deletion = if let Some(update) = app
+                    .tree_state()?
+                    .batching_tree()
+                    .peek_next_updates(batch_size + 1)
+                    .last()
+                {
+                    update.update.element == Hash::ZERO
+                } else {
+                    false
+                };
+
+                // If the next batch is deletion, process the current insertion batch
+                if next_batch_is_deletion {
+                    commit_identities(
+                        &app.identity_manager,
+                        app.tree_state()?.batching_tree(),
+                        &monitored_txs_sender,
+                        &updates,
+                    )
+                    .await?;
+                } else {
+                    // If there are not enough identities to fill the batch, the time interval has
+                    // not elapsed and the next batch is not deletion, wait for more identities
+                    tracing::trace!(
+                        "Pending identities ({}) is less than batch size ({}). Waiting.",
+                        updates.len(),
+                        batch_size
+                    );
+                    continue;
+                }
+            }
+        }
 
         // We want to check if there's a full batch available immediately
         wake_up_notify.notify_one();