From 47d8ebeac91ab08a1ec22a420b827fa9164226f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Tr=C4=85d?= Date: Tue, 7 Nov 2023 14:38:17 +0100 Subject: [PATCH] Empty root exception & purge cache if necessary. (#645) * Add db method * Purge cache if necessary * Change to processed tree --- src/app.rs | 36 +++++++++++++++++++++++++++++----- src/database/mod.rs | 48 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 5 deletions(-) diff --git a/src/app.rs b/src/app.rs index 775ee4ac..37e49fa8 100644 --- a/src/app.rs +++ b/src/app.rs @@ -203,10 +203,14 @@ impl App { // Note that we don't have a way of queuing a root here for finalization. // so it's going to stay as "processed" until the next root is mined. database.mark_root_as_processed(&root_hash).await?; + } else { + // Db is either empty or we're restarting with a new contract/chain + // so we should mark everything as pending + database.mark_all_as_pending().await?; } let timer = Instant::now(); - let tree_state = Self::restore_or_initialize_tree( + let mut tree_state = Self::restore_or_initialize_tree( &database, // Poseidon tree depth is one more than the contract's tree depth identity_manager.tree_depth(), @@ -214,12 +218,34 @@ impl App { options.tree_gc_threshold, identity_manager.initial_leaf_value(), initial_root_hash, - options.dense_tree_mmap_file, + &options.dense_tree_mmap_file, options.force_cache_purge, ) .await?; info!("Tree state initialization took: {:?}", timer.elapsed()); + let tree_root = tree_state.get_processed_tree().get_root(); + + if tree_root != root_hash { + warn!( + "Cached tree root is different from the contract root. Purging cache and \ + reinitializing." + ); + + tree_state = Self::restore_or_initialize_tree( + &database, + // Poseidon tree depth is one more than the contract's tree depth + identity_manager.tree_depth(), + options.dense_tree_prefix_depth, + options.tree_gc_threshold, + identity_manager.initial_leaf_value(), + initial_root_hash, + &options.dense_tree_mmap_file, + true, + ) + .await?; + } + let identity_committer = Arc::new(TaskMonitor::new( database.clone(), identity_manager.clone(), @@ -257,7 +283,7 @@ impl App { gc_threshold: usize, initial_leaf_value: Hash, initial_root_hash: Hash, - mmap_file_path: String, + mmap_file_path: &str, force_cache_purge: bool, ) -> AnyhowResult { let mut mined_items = database @@ -408,7 +434,7 @@ impl App { gc_threshold: usize, initial_leaf_value: Hash, mined_items: Vec, - mmap_file_path: String, + mmap_file_path: &str, ) -> AnyhowResult { // Flatten the updates for initial leaves let mined_items = dedup_tree_updates(mined_items); @@ -432,7 +458,7 @@ impl App { gc_threshold, initial_leaf_value, &initial_leaves, - &mmap_file_path, + mmap_file_path, ); let (mined, mut processed_builder) = mined_builder.seal(); diff --git a/src/database/mod.rs b/src/database/mod.rs index 98a27748..d5835e04 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -228,6 +228,25 @@ impl Database { Ok(()) } + /// Marks all the identities in the db as + #[instrument(skip(self), level = "debug")] + pub async fn mark_all_as_pending(&self) -> Result<(), Error> { + let pending_status = ProcessedStatus::Pending; + + let update_all_identities = sqlx::query( + r#" + UPDATE identities + SET status = $1, mined_at = NULL + WHERE status <> $1 + "#, + ) + .bind(<&str>::from(pending_status)); + + self.pool.execute(update_all_identities).await?; + + Ok(()) + } + /// Marks the identities and roots from before a given root hash as /// finalized #[instrument(skip(self), level = "debug")] @@ -1269,6 +1288,35 @@ mod test { Ok(()) } + #[tokio::test] + async fn mark_all_as_pending_marks_all() -> anyhow::Result<()> { + let (db, _db_container) = setup_db().await?; + + let identities = mock_identities(5); + let roots = mock_roots(5); + + for i in 0..5 { + db.insert_pending_identity(i, &identities[i], &roots[i]) + .await + .context("Inserting identity")?; + } + + db.mark_root_as_processed(&roots[2]).await?; + + db.mark_all_as_pending().await?; + + for root in roots.iter() { + let root = db + .get_root_state(root) + .await? + .context("Fetching root state")?; + + assert_eq!(root.status, ProcessedStatus::Pending); + } + + Ok(()) + } + #[tokio::test] async fn mark_root_as_processed_marks_previous_roots() -> anyhow::Result<()> { let (db, _db_container) = setup_db().await?;