Skip to content

Commit

Permalink
updated process identities to only use last insertion timestamp for i…
Browse files Browse the repository at this point in the history
…nsertions
  • Loading branch information
0xKitsune committed Apr 13, 2024
1 parent 3763145 commit 8a22b6c
Showing 1 changed file with 43 additions and 30 deletions.
73 changes: 43 additions & 30 deletions src/task_monitor/tasks/process_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,44 +159,57 @@ async fn process_identities(
continue;
}

let batch_size = if next_update[0].update.element == Hash::ZERO {
identity_manager.max_deletion_batch_size().await
let (batch_size, insertion) = if next_update[0].update.element == Hash::ZERO {
(identity_manager.max_deletion_batch_size().await, false)
}else{
identity_manager.max_insertion_batch_size().await
(identity_manager.max_insertion_batch_size().await, true)
};

// We have _at most_ one complete batch here.
let updates = batching_tree.peek_next_updates(batch_size);

// If there are not enough identities to insert at this
// stage we can wait. The timer will ensure that the API
// clients do not wait too long for their submission to be
// completed.
if updates.len() < batch_size && !should_process_anyway {
// We do not reset the timer here as we may want to
// insert anyway soon.
tracing::trace!(
"Pending identities ({}) is less than batch size ({}). Waiting.",
updates.len(),
batch_size
);
continue;
}

commit_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;
// If insertion, check if we have enough identities to insert or if the insertion interval has elapsed
if insertion{
// If there are not enough identities to insert at this
// stage we can wait. The timer will ensure that the API
// clients do not wait too long for their submission to be
// completed.
if updates.len() < batch_size && !should_process_anyway {
// We do not reset the timer here as we may want to
// insert anyway soon.
tracing::trace!(
"Pending identities ({}) is less than batch size ({}). Waiting.",
updates.len(),
batch_size
);
continue;
}

commit_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;

// We've inserted the identities, so we want to ensure that
// we don't trigger again until either we get a full batch
// or the timer ticks.
timer.reset();
last_batch_time = Utc::now();
database.update_latest_insertion_timestamp(last_batch_time).await?;

// We've inserted the identities, so we want to ensure that
// we don't trigger again until either we get a full batch
// or the timer ticks.
timer.reset();
last_batch_time = Utc::now();
database.update_latest_insertion_timestamp(last_batch_time).await?;
}else{
commit_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;
}

// We want to check if there's a full batch available immediately
wake_up_notify.notify_one();
Expand Down

0 comments on commit 8a22b6c

Please sign in to comment.