Skip to content

Commit

Permalink
Lock when inserting deletions or insertions (#708)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop authored Apr 15, 2024
1 parent 8a22b6c commit f0f07fc
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
6 changes: 5 additions & 1 deletion src/task_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use clap::Parser;
use once_cell::sync::Lazy;
use prometheus::{linear_buckets, register_gauge, register_histogram, Gauge, Histogram};
use tokio::sync::{broadcast, mpsc, Notify, RwLock};
use tokio::sync::{broadcast, mpsc, Mutex, Notify, RwLock};
use tokio::task::JoinHandle;
use tracing::{info, instrument, warn};

Expand Down Expand Up @@ -197,6 +197,8 @@ impl TaskMonitor {
// in the database
wake_up_notify.notify_one();

let pending_insertion_mutex = Arc::new(Mutex::new(()));

let mut handles = Vec::new();

// Finalize identities task
Expand Down Expand Up @@ -252,6 +254,7 @@ impl TaskMonitor {
self.database.clone(),
self.tree_state.get_latest_tree(),
wake_up_notify.clone(),
pending_insertion_mutex.clone(),
);

let insert_identities_handle = crate::utils::spawn_monitored_with_backoff(
Expand All @@ -269,6 +272,7 @@ impl TaskMonitor {
self.batch_deletion_timeout_seconds,
self.min_batch_deletion_size,
wake_up_notify,
pending_insertion_mutex,
);

let delete_identities_handle = crate::utils::spawn_monitored_with_backoff(
Expand Down
19 changes: 13 additions & 6 deletions src/task_monitor/tasks/delete_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ use std::collections::HashSet;
use std::sync::Arc;

use chrono::Utc;
use tokio::sync::Notify;
use tokio::sync::{Mutex, Notify};
use tracing::info;

use crate::database::types::DeletionEntry;
use crate::database::Database;
use crate::identity_tree::{Hash, Latest, TreeVersion};

pub struct DeleteIdentities {
database: Arc<Database>,
latest_tree: TreeVersion<Latest>,
deletion_time_interval: i64,
min_deletion_batch_size: usize,
wake_up_notify: Arc<Notify>,
database: Arc<Database>,
latest_tree: TreeVersion<Latest>,
deletion_time_interval: i64,
min_deletion_batch_size: usize,
wake_up_notify: Arc<Notify>,
pending_insertions_mutex: Arc<Mutex<()>>,
}

impl DeleteIdentities {
Expand All @@ -24,13 +25,15 @@ impl DeleteIdentities {
deletion_time_interval: i64,
min_deletion_batch_size: usize,
wake_up_notify: Arc<Notify>,
pending_insertions_mutex: Arc<Mutex<()>>,
) -> Arc<Self> {
Arc::new(Self {
database,
latest_tree,
deletion_time_interval,
min_deletion_batch_size,
wake_up_notify,
pending_insertions_mutex,
})
}

Expand All @@ -41,6 +44,7 @@ impl DeleteIdentities {
self.deletion_time_interval,
self.min_deletion_batch_size,
self.wake_up_notify.clone(),
&self.pending_insertions_mutex,
)
.await
}
Expand All @@ -52,6 +56,7 @@ async fn delete_identities(
deletion_time_interval: i64,
min_deletion_batch_size: usize,
wake_up_notify: Arc<Notify>,
pending_insertions_mutex: &Mutex<()>,
) -> anyhow::Result<()> {
info!("Starting deletion processor.");

Expand Down Expand Up @@ -89,6 +94,8 @@ async fn delete_identities(
"Length mismatch when appending identities to tree"
);

let _guard = pending_insertions_mutex.lock().await;

// Insert the new items into pending identities
let items = data.into_iter().zip(leaf_indices);
for ((root, _proof), leaf_index) in items {
Expand Down
25 changes: 19 additions & 6 deletions src/task_monitor/tasks/insert_identities.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::Notify;
use tokio::sync::{Mutex, Notify};
use tokio::time::sleep;
use tracing::instrument;

Expand All @@ -10,33 +10,43 @@ use crate::database::Database;
use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus};

pub struct InsertIdentities {
database: Arc<Database>,
latest_tree: TreeVersion<Latest>,
wake_up_notify: Arc<Notify>,
database: Arc<Database>,
latest_tree: TreeVersion<Latest>,
wake_up_notify: Arc<Notify>,
pending_insertions_mutex: Arc<Mutex<()>>,
}

impl InsertIdentities {
pub fn new(
database: Arc<Database>,
latest_tree: TreeVersion<Latest>,
wake_up_notify: Arc<Notify>,
pending_insertions_mutex: Arc<Mutex<()>>,
) -> Arc<Self> {
Arc::new(Self {
database,
latest_tree,
wake_up_notify,
pending_insertions_mutex,
})
}

pub async fn run(self: Arc<Self>) -> anyhow::Result<()> {
insert_identities_loop(&self.database, &self.latest_tree, &self.wake_up_notify).await
insert_identities_loop(
&self.database,
&self.latest_tree,
&self.wake_up_notify,
&self.pending_insertions_mutex,
)
.await
}
}

async fn insert_identities_loop(
database: &Database,
latest_tree: &TreeVersion<Latest>,
wake_up_notify: &Notify,
pending_insertions_mutex: &Mutex<()>,
) -> anyhow::Result<()> {
loop {
// get commits from database
Expand All @@ -48,7 +58,7 @@ async fn insert_identities_loop(
continue;
}

insert_identities(database, latest_tree, unprocessed).await?;
insert_identities(database, latest_tree, unprocessed, pending_insertions_mutex).await?;
// Notify the identity processing task, that there are new identities
wake_up_notify.notify_one();
}
Expand All @@ -59,6 +69,7 @@ async fn insert_identities(
database: &Database,
latest_tree: &TreeVersion<Latest>,
identities: Vec<UnprocessedCommitment>,
pending_insertions_mutex: &Mutex<()>,
) -> anyhow::Result<()> {
// Filter out any identities that are already in the `identities` table
let mut filtered_identities = vec![];
Expand Down Expand Up @@ -96,6 +107,8 @@ async fn insert_identities(

let items = data.into_iter().zip(filtered_identities);

let _guard = pending_insertions_mutex.lock().await;

for ((root, _proof, leaf_index), identity) in items {
database
.insert_pending_identity(leaf_index, &identity, &root)
Expand Down

0 comments on commit f0f07fc

Please sign in to comment.