diff --git a/src/identity/processor.rs b/src/identity/processor.rs index 55123429..9f5a8cca 100644 --- a/src/identity/processor.rs +++ b/src/identity/processor.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use anyhow::anyhow; @@ -513,7 +513,7 @@ impl OnChainIdentityProcessor { } pub struct OffChainIdentityProcessor { - committed_batches: Arc>>, + committed_batches: Arc>>, database: Arc, } @@ -529,19 +529,31 @@ impl IdentityProcessor for OffChainIdentityProcessor { processed_tree: &TreeVersion, mined_tree: &TreeVersion, ) -> anyhow::Result<()> { - let batches = { - let mut committed_batches = self.committed_batches.lock().unwrap(); - let copied = committed_batches.clone(); - committed_batches.clear(); - copied - }; - - for batch in batches.iter() { + loop { + let batch = { + let mut committed_batches = self.committed_batches.lock().unwrap(); + committed_batches.pop_front() + }; + + let Some(batch) = batch else { + return Ok(()); + }; + let mut tx = self .database .begin_tx(IsolationLevel::ReadCommitted) .await?; + let root_state = tx.get_root_state(&batch.next_root).await?; + if root_state.is_none() { + // If root is not in identities table we can't mark it as processed or mined. + // It happens sometimes as we do not have atomic operation for database and tree + // insertion. + // TODO: check if this is still possible after HA being done + tx.commit().await?; + return Ok(()); + } + tx.mark_root_as_processed(&batch.next_root).await?; tx.mark_root_as_mined(&batch.next_root).await?; @@ -550,8 +562,6 @@ impl IdentityProcessor for OffChainIdentityProcessor { processed_tree.apply_updates_up_to(batch.next_root); mined_tree.apply_updates_up_to(batch.next_root); } - - Ok(()) } async fn await_clean_slate(&self) -> anyhow::Result<()> { @@ -609,6 +619,6 @@ impl OffChainIdentityProcessor { fn add_batch(&self, batch_entry: BatchEntry) { let mut committed_batches = self.committed_batches.lock().unwrap(); - committed_batches.push(batch_entry); + committed_batches.push_back(batch_entry); } }