Skip to content

Commit

Permalink
add insert pending identity mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune committed Apr 22, 2024
1 parent 3f7529b commit 8dec2b1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
18 changes: 16 additions & 2 deletions src/task_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ impl TaskMonitor {
// Process identities
let app = self.app.clone();
let wake_up_notify = base_wake_up_notify.clone();

let process_identities = move || {
tasks::process_identities::process_identities(
app.clone(),
Expand All @@ -158,12 +159,20 @@ impl TaskMonitor {
);
handles.push(monitor_txs_handle);

let pending_insertion_mutex = Arc::new(Mutex::new(()));

// Insert identities
let app = self.app.clone();
let wake_up_notify = base_wake_up_notify.clone();
let insertion_mutex = pending_insertion_mutex.clone();
let insert_identities = move || {
self::tasks::insert_identities::insert_identities(app.clone(), wake_up_notify.clone())
self::tasks::insert_identities::insert_identities(
app.clone(),
insertion_mutex.clone(),
wake_up_notify.clone(),
)
};

let insert_identities_handle = crate::utils::spawn_monitored_with_backoff(
insert_identities,
shutdown_sender.clone(),
Expand All @@ -175,8 +184,13 @@ impl TaskMonitor {
let app = self.app.clone();
let wake_up_notify = base_wake_up_notify.clone();
let delete_identities = move || {
self::tasks::delete_identities::delete_identities(app.clone(), wake_up_notify.clone())
self::tasks::delete_identities::delete_identities(
app.clone(),
pending_insertion_mutex.clone(),
wake_up_notify.clone(),
)
};

let delete_identities_handle = crate::utils::spawn_monitored_with_backoff(
delete_identities,
shutdown_sender.clone(),
Expand Down
10 changes: 8 additions & 2 deletions src/task_monitor/tasks/delete_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ use std::sync::Arc;

use anyhow::Context;
use chrono::Utc;
use tokio::sync::Notify;
use tokio::sync::{Mutex, Notify};
use tracing::info;

use crate::app::App;
use crate::database::types::DeletionEntry;
use crate::database::DatabaseExt;
use crate::identity_tree::Hash;

pub async fn delete_identities(app: Arc<App>, wake_up_notify: Arc<Notify>) -> anyhow::Result<()> {
pub async fn delete_identities(
app: Arc<App>,
pending_insertions_mutex: Arc<Mutex<()>>,
wake_up_notify: Arc<Notify>,
) -> anyhow::Result<()> {
info!("Starting deletion processor.");

let batch_deletion_timeout = chrono::Duration::from_std(app.config.app.batch_deletion_timeout)
Expand Down Expand Up @@ -39,6 +43,8 @@ pub async fn delete_identities(app: Arc<App>, wake_up_notify: Arc<Notify>) -> an
.map(|d| (d.leaf_index, d.commitment))
.unzip();

let _guard = pending_insertions_mutex.lock().await;

// Delete the commitments at the target leaf indices in the latest tree,
// generating the proof for each update
let data = app.tree_state()?.latest_tree().delete_many(&leaf_indices);
Expand Down
20 changes: 16 additions & 4 deletions src/task_monitor/tasks/insert_identities.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::Notify;
use tokio::sync::{Mutex, Notify};
use tokio::time::sleep;
use tracing::instrument;

Expand All @@ -10,7 +10,11 @@ use crate::database::types::UnprocessedCommitment;
use crate::database::{Database, DatabaseExt};
use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus};

pub async fn insert_identities(app: Arc<App>, wake_up_notify: Arc<Notify>) -> anyhow::Result<()> {
pub async fn insert_identities(
app: Arc<App>,
pending_insertions_mutex: Arc<Mutex<()>>,
wake_up_notify: Arc<Notify>,
) -> anyhow::Result<()> {
loop {
// get commits from database
let unprocessed = app
Expand All @@ -22,8 +26,13 @@ pub async fn insert_identities(app: Arc<App>, wake_up_notify: Arc<Notify>) -> an
continue;
}

insert_identities_batch(&app.database, app.tree_state()?.latest_tree(), unprocessed)
.await?;
insert_identities_batch(
&app.database,
app.tree_state()?.latest_tree(),
unprocessed,
&pending_insertions_mutex,
)
.await?;
// Notify the identity processing task, that there are new identities
wake_up_notify.notify_one();
}
Expand All @@ -34,6 +43,7 @@ async fn insert_identities_batch(
database: &Database,
latest_tree: &TreeVersion<Latest>,
identities: Vec<UnprocessedCommitment>,
pending_insertions_mutex: &Mutex<()>,
) -> anyhow::Result<()> {
// Filter out any identities that are already in the `identities` table
let mut filtered_identities = vec![];
Expand All @@ -52,6 +62,8 @@ async fn insert_identities_batch(
}
}

let _guard = pending_insertions_mutex.lock().await;

let next_db_index = database.get_next_leaf_index().await?;
let next_leaf = latest_tree.next_leaf();

Expand Down

0 comments on commit 8dec2b1

Please sign in to comment.