Skip to content

Commit

Permalink
Merge pull request #831 from worldcoin/piohei/fix_finalizing_roots_of…
Browse files Browse the repository at this point in the history
…fchain

Fix finalization for offchain mode.
  • Loading branch information
piohei authored Dec 2, 2024
2 parents 51d7aa1 + c0ca8d4 commit a174239
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions src/identity/processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};

use anyhow::anyhow;
Expand Down Expand Up @@ -513,7 +513,7 @@ impl OnChainIdentityProcessor {
}

pub struct OffChainIdentityProcessor {
committed_batches: Arc<Mutex<Vec<BatchEntry>>>,
committed_batches: Arc<Mutex<VecDeque<BatchEntry>>>,
database: Arc<Database>,
}

Expand All @@ -529,19 +529,31 @@ impl IdentityProcessor for OffChainIdentityProcessor {
processed_tree: &TreeVersion<Intermediate>,
mined_tree: &TreeVersion<Canonical>,
) -> anyhow::Result<()> {
let batches = {
let mut committed_batches = self.committed_batches.lock().unwrap();
let copied = committed_batches.clone();
committed_batches.clear();
copied
};

for batch in batches.iter() {
loop {
let batch = {
let mut committed_batches = self.committed_batches.lock().unwrap();
committed_batches.pop_front()
};

let Some(batch) = batch else {
return Ok(());
};

let mut tx = self
.database
.begin_tx(IsolationLevel::ReadCommitted)
.await?;

let root_state = tx.get_root_state(&batch.next_root).await?;
if root_state.is_none() {
// If root is not in identities table we can't mark it as processed or mined.
// It happens sometimes as we do not have atomic operation for database and tree
// insertion.
// TODO: check if this is still possible after HA being done
tx.commit().await?;
return Ok(());
}

tx.mark_root_as_processed(&batch.next_root).await?;
tx.mark_root_as_mined(&batch.next_root).await?;

Expand All @@ -550,8 +562,6 @@ impl IdentityProcessor for OffChainIdentityProcessor {
processed_tree.apply_updates_up_to(batch.next_root);
mined_tree.apply_updates_up_to(batch.next_root);
}

Ok(())
}

async fn await_clean_slate(&self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -609,6 +619,6 @@ impl OffChainIdentityProcessor {
fn add_batch(&self, batch_entry: BatchEntry) {
let mut committed_batches = self.committed_batches.lock().unwrap();

committed_batches.push(batch_entry);
committed_batches.push_back(batch_entry);
}
}

0 comments on commit a174239

Please sign in to comment.