diff --git a/src/task_monitor.rs b/src/task_monitor.rs index 03654287..e36d8828 100644 --- a/src/task_monitor.rs +++ b/src/task_monitor.rs @@ -4,7 +4,7 @@ use std::time::Duration; use clap::Parser; use once_cell::sync::Lazy; use prometheus::{linear_buckets, register_gauge, register_histogram, Gauge, Histogram}; -use tokio::sync::{broadcast, mpsc, Notify, RwLock}; +use tokio::sync::{broadcast, mpsc, Mutex, Notify, RwLock}; use tokio::task::JoinHandle; use tracing::{info, instrument, warn}; @@ -197,6 +197,8 @@ impl TaskMonitor { // in the database wake_up_notify.notify_one(); + let pending_insertion_mutex = Arc::new(Mutex::new(())); + let mut handles = Vec::new(); // Finalize identities task @@ -252,6 +254,7 @@ impl TaskMonitor { self.database.clone(), self.tree_state.get_latest_tree(), wake_up_notify.clone(), + pending_insertion_mutex.clone(), ); let insert_identities_handle = crate::utils::spawn_monitored_with_backoff( @@ -269,6 +272,7 @@ impl TaskMonitor { self.batch_deletion_timeout_seconds, self.min_batch_deletion_size, wake_up_notify, + pending_insertion_mutex, ); let delete_identities_handle = crate::utils::spawn_monitored_with_backoff( diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index b21779bd..02f7e343 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use std::sync::Arc; use chrono::Utc; -use tokio::sync::Notify; +use tokio::sync::{Mutex, Notify}; use tracing::info; use crate::database::types::DeletionEntry; @@ -10,11 +10,12 @@ use crate::database::Database; use crate::identity_tree::{Hash, Latest, TreeVersion}; pub struct DeleteIdentities { - database: Arc, - latest_tree: TreeVersion, - deletion_time_interval: i64, - min_deletion_batch_size: usize, - wake_up_notify: Arc, + database: Arc, + latest_tree: TreeVersion, + deletion_time_interval: i64, + min_deletion_batch_size: usize, + wake_up_notify: Arc, + pending_insertions_mutex: Arc>, } impl DeleteIdentities { @@ -24,6 +25,7 @@ impl DeleteIdentities { deletion_time_interval: i64, min_deletion_batch_size: usize, wake_up_notify: Arc, + pending_insertions_mutex: Arc>, ) -> Arc { Arc::new(Self { database, @@ -31,6 +33,7 @@ impl DeleteIdentities { deletion_time_interval, min_deletion_batch_size, wake_up_notify, + pending_insertions_mutex, }) } @@ -41,6 +44,7 @@ impl DeleteIdentities { self.deletion_time_interval, self.min_deletion_batch_size, self.wake_up_notify.clone(), + &self.pending_insertions_mutex, ) .await } @@ -52,6 +56,7 @@ async fn delete_identities( deletion_time_interval: i64, min_deletion_batch_size: usize, wake_up_notify: Arc, + pending_insertions_mutex: &Mutex<()>, ) -> anyhow::Result<()> { info!("Starting deletion processor."); @@ -89,6 +94,8 @@ async fn delete_identities( "Length mismatch when appending identities to tree" ); + let _guard = pending_insertions_mutex.lock().await; + // Insert the new items into pending identities let items = data.into_iter().zip(leaf_indices); for ((root, _proof), leaf_index) in items { diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index 10fc65f7..2d6cc89d 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::time::Duration; -use tokio::sync::Notify; +use tokio::sync::{Mutex, Notify}; use tokio::time::sleep; use tracing::instrument; @@ -10,9 +10,10 @@ use crate::database::Database; use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; pub struct InsertIdentities { - database: Arc, - latest_tree: TreeVersion, - wake_up_notify: Arc, + database: Arc, + latest_tree: TreeVersion, + wake_up_notify: Arc, + pending_insertions_mutex: Arc>, } impl InsertIdentities { @@ -20,16 +21,24 @@ impl InsertIdentities { database: Arc, latest_tree: TreeVersion, wake_up_notify: Arc, + pending_insertions_mutex: Arc>, ) -> Arc { Arc::new(Self { database, latest_tree, wake_up_notify, + pending_insertions_mutex, }) } pub async fn run(self: Arc) -> anyhow::Result<()> { - insert_identities_loop(&self.database, &self.latest_tree, &self.wake_up_notify).await + insert_identities_loop( + &self.database, + &self.latest_tree, + &self.wake_up_notify, + &self.pending_insertions_mutex, + ) + .await } } @@ -37,6 +46,7 @@ async fn insert_identities_loop( database: &Database, latest_tree: &TreeVersion, wake_up_notify: &Notify, + pending_insertions_mutex: &Mutex<()>, ) -> anyhow::Result<()> { loop { // get commits from database @@ -48,7 +58,7 @@ async fn insert_identities_loop( continue; } - insert_identities(database, latest_tree, unprocessed).await?; + insert_identities(database, latest_tree, unprocessed, pending_insertions_mutex).await?; // Notify the identity processing task, that there are new identities wake_up_notify.notify_one(); } @@ -59,6 +69,7 @@ async fn insert_identities( database: &Database, latest_tree: &TreeVersion, identities: Vec, + pending_insertions_mutex: &Mutex<()>, ) -> anyhow::Result<()> { // Filter out any identities that are already in the `identities` table let mut filtered_identities = vec![]; @@ -96,6 +107,8 @@ async fn insert_identities( let items = data.into_iter().zip(filtered_identities); + let _guard = pending_insertions_mutex.lock().await; + for ((root, _proof, leaf_index), identity) in items { database .insert_pending_identity(leaf_index, &identity, &root)