Skip to content

Commit

Permalink
process deletions immediately, check if next batch is deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune committed Apr 22, 2024
1 parent 8dec2b1 commit b9c4461
Showing 1 changed file with 77 additions and 33 deletions.
110 changes: 77 additions & 33 deletions src/task_monitor/tasks/process_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub async fn process_identities(
_ = timer.tick() => {
tracing::info!("Identity batch insertion woken due to timeout");
}

() = wake_up_notify.notified() => {
tracing::trace!("Identity batch insertion woken due to request");
},
Expand All @@ -82,40 +83,83 @@ pub async fn process_identities(
.batching_tree()
.peek_next_updates(batch_size);

let current_time = Utc::now();
let batch_insertion_timeout =
chrono::Duration::from_std(app.config.app.batch_insertion_timeout)?;

let timeout_batch_time = last_batch_time
+ batch_insertion_timeout
+ chrono::Duration::seconds(DEBOUNCE_THRESHOLD_SECS);

let can_skip_batch = current_time < timeout_batch_time;

if updates.len() < batch_size && can_skip_batch {
tracing::trace!(
num_updates = updates.len(),
batch_size,
?last_batch_time,
"Pending identities is less than batch size, skipping batch",
);

continue;
}

commit_identities(
&app.identity_manager,
app.tree_state()?.batching_tree(),
&monitored_txs_sender,
&updates,
)
.await?;

timer.reset();
last_batch_time = Utc::now();
app.database
.update_latest_insertion_timestamp(last_batch_time)
// If the batch is a deletion, process immediately without resetting the timer
if batch_type.is_deletion() {
commit_identities(
&app.identity_manager,
app.tree_state()?.batching_tree(),
&monitored_txs_sender,
&updates,
)
.await?;
} else {
let current_time = Utc::now();
let batch_insertion_timeout =
chrono::Duration::from_std(app.config.app.batch_insertion_timeout)?;

let timeout_batch_time = last_batch_time
+ batch_insertion_timeout
+ chrono::Duration::seconds(DEBOUNCE_THRESHOLD_SECS);

let batch_time_elapsed = current_time >= timeout_batch_time;

// If the batch size is full or if the insertion time has elapsed
// process the batch
if updates.len() >= batch_size || batch_time_elapsed {
commit_identities(
&app.identity_manager,
app.tree_state()?.batching_tree(),
&monitored_txs_sender,
&updates,
)
.await?;

// We've inserted the identities, so we want to ensure that
// we don't trigger again until either we get a full batch
// or the timer ticks.
timer.reset();
last_batch_time = Utc::now();
app.database
.update_latest_insertion_timestamp(last_batch_time)
.await?;
} else {
// Check if the next batch after the current insertion batch is
// deletion. The only time that deletions are
// inserted is when there is a full deletion batch or the
// deletion time interval has elapsed.
// In this case, we should immediately process the batch.
let next_batch_is_deletion = if let Some(update) = app
.tree_state()?
.batching_tree()
.peek_next_updates(batch_size + 1)
.last()
{
update.update.element == Hash::ZERO
} else {
false
};

// If the next batch is deletion, process the current insertion batch
if next_batch_is_deletion {
commit_identities(
&app.identity_manager,
app.tree_state()?.batching_tree(),
&monitored_txs_sender,
&updates,
)
.await?;
} else {
// If there are not enough identities to fill the batch, the time interval has
// not elapsed and the next batch is not deletion, wait for more identities
tracing::trace!(
"Pending identities ({}) is less than batch size ({}). Waiting.",
updates.len(),
batch_size
);
continue;
}
}
}

// We want to check if there's a full batch available immediately
wake_up_notify.notify_one();
Expand Down

0 comments on commit b9c4461

Please sign in to comment.