Skip to content

Commit

Permalink
update process identities logic to process partial insertion batches …
Browse files Browse the repository at this point in the history
…if next batch is deletion
  • Loading branch information
0xKitsune committed Apr 15, 2024
1 parent 2ed20f1 commit 1146e4b
Showing 1 changed file with 47 additions and 31 deletions.
78 changes: 47 additions & 31 deletions src/task_monitor/tasks/process_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,40 +168,56 @@ async fn process_identities(
// We have _at most_ one complete batch here.
let updates = batching_tree.peek_next_updates(batch_size);


// If insertion, check if we have enough identities to insert or if the insertion interval has elapsed
if insertion{
// If there are not enough identities to insert at this
// stage we can wait. The timer will ensure that the API
// clients do not wait too long for their submission to be
// completed.
if updates.len() < batch_size && !should_process_anyway {
// We do not reset the timer here as we may want to
// insert anyway soon.
tracing::trace!(
"Pending identities ({}) is less than batch size ({}). Waiting.",
updates.len(),
batch_size
);
continue;
if insertion {
// If the batch size is full or if the insertion time has elapsed, process the batch
if updates.len() == batch_size || should_process_anyway {
commit_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;

// We've inserted the identities, so we want to ensure that
// we don't trigger again until either we get a full batch
// or the timer ticks.
timer.reset();
last_batch_time = Utc::now();
database.update_latest_insertion_timestamp(last_batch_time).await?;
}else{
// Check if the next batch after the current insertion batch is deletion.
// The only time that deletions are inserted is when there is a full deletion batch or the deletion time interval has elapsed.
// In this case, we should immediately process the batch.
let next_batch_is_deletion = if let Some(update) = batching_tree.peek_next_updates(batch_size + 1).last(){
update.update.element == Hash::ZERO
}else{
false
};

//If the next batch is deletion, process the current insertion batch
if next_batch_is_deletion {
commit_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;

}else{
// If there are not enough identities to fill the batch, the time interval has not elapsed and the next batch is not deletion, wait for more identities
tracing::trace!(
"Pending identities ({}) is less than batch size ({}). Waiting.",
updates.len(),
batch_size
);
continue;
}
}

commit_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;

// We've inserted the identities, so we want to ensure that
// we don't trigger again until either we get a full batch
// or the timer ticks.
timer.reset();
last_batch_time = Utc::now();
database.update_latest_insertion_timestamp(last_batch_time).await?;

}else{
// If the next batch is a deletion batch, process immediately
commit_identities(
database,
identity_manager,
Expand Down

0 comments on commit 1146e4b

Please sign in to comment.