Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Oct 18, 2024
1 parent 88f8a48 commit 28b34c3
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::identity::processor::{
};
use crate::identity::validator::IdentityValidator;
use crate::identity_tree::initializer::TreeInitializer;
use crate::identity_tree::{Hash, InclusionProof, RootItem, TreeState, TreeVersionReadOps};
use crate::identity_tree::{Hash, InclusionProof, RootItem, TreeState, TreeVersionOps};
use crate::prover::map::initialize_prover_maps;
use crate::prover::repository::ProverRepository;
use crate::prover::{ProverConfig, ProverType};
Expand Down
18 changes: 4 additions & 14 deletions src/database/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,12 +689,12 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
async fn get_eligible_unprocessed_commitments(
self,
status: UnprocessedStatus,
) -> Result<Vec<types::UnprocessedCommitment>, Error> {
) -> Result<Vec<Hash>, Error> {
let mut conn = self.acquire().await?;

let result = sqlx::query(
let result: Vec<(Hash,)> = sqlx::query_as(
r#"
SELECT * FROM unprocessed_identities
SELECT commitment FROM unprocessed_identities
WHERE status = $1 AND CURRENT_TIMESTAMP > eligibility
LIMIT $2
"#,
Expand All @@ -704,17 +704,7 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
.fetch_all(&mut *conn)
.await?;

Ok(result
.into_iter()
.map(|row| types::UnprocessedCommitment {
commitment: row.get::<Hash, _>(0),
status,
created_at: row.get::<_, _>(2),
processed_at: row.get::<_, _>(3),
error_message: row.get::<_, _>(4),
eligibility_timestamp: row.get::<_, _>(5),
})
.collect::<Vec<_>>())
Ok(result.into_iter().map(|(commitment,)| commitment).collect())
}

/// Returns the error message from the unprocessed identities table
Expand Down
13 changes: 2 additions & 11 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,12 +519,7 @@ mod test {
.await?;

assert_eq!(unprocessed_commitments.len(), 1);
assert_eq!(unprocessed_commitments[0].commitment, commitment_0);
assert!(
unprocessed_commitments[0].eligibility_timestamp.timestamp()
- eligibility_timestamp_0.timestamp()
<= 1
);
assert_eq!(unprocessed_commitments[0], commitment_0);

Ok(())
}
Expand Down Expand Up @@ -554,11 +549,7 @@ mod test {

// Assert unprocessed commitments against expected values
assert_eq!(unprocessed_commitments.len(), 1);
assert_eq!(unprocessed_commitments[0].commitment, commitment_0);
assert_eq!(
unprocessed_commitments[0].eligibility_timestamp.timestamp(),
eligibility_timestamp_0.timestamp()
);
assert_eq!(unprocessed_commitments[0], commitment_0);

Ok(())
}
Expand Down
11 changes: 1 addition & 10 deletions src/database/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,9 @@ use sqlx::error::BoxDynError;
use sqlx::prelude::FromRow;
use sqlx::{Database, Decode, Encode, Postgres, Type};

use crate::identity_tree::{Hash, UnprocessedStatus};
use crate::identity_tree::Hash;
use crate::prover::identity::Identity;

pub struct UnprocessedCommitment {
pub commitment: Hash,
pub status: UnprocessedStatus,
pub created_at: DateTime<Utc>,
pub processed_at: Option<DateTime<Utc>>,
pub error_message: Option<String>,
pub eligibility_timestamp: DateTime<Utc>,
}

#[derive(FromRow)]
pub struct RecoveryEntry {
// existing commitment is used in tests only, but recoveries in general
Expand Down
2 changes: 1 addition & 1 deletion src/identity_tree/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::database::methods::DbMethods;
use crate::database::Database;
use crate::identity::processor::IdentityProcessor;
use crate::identity_tree::{
CanonicalTreeBuilder, Hash, ProcessedStatus, TreeState, TreeUpdate, TreeVersionReadOps,
CanonicalTreeBuilder, Hash, ProcessedStatus, TreeState, TreeUpdate, TreeVersionOps,
TreeWithNextVersion,
};
use crate::utils::tree_updates::dedup_tree_updates;
Expand Down
12 changes: 9 additions & 3 deletions src/identity_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,9 @@ impl<V: Version<TreeVersion = lazy_merkle_tree::Derived>> TreeVersion<V> {
/// The public-facing API for reading from a tree version. It is implemented for
/// all versions. This being a trait allows us to hide some of the
/// implementation details.
pub trait TreeVersionReadOps {
pub trait TreeVersionOps {
fn update(&self, leaf_index: usize, element: Hash);

/// Returns the current tree root.
fn get_root(&self) -> Hash;
/// Returns the next free leaf.
Expand All @@ -411,10 +413,14 @@ pub trait TreeVersionReadOps {
fn get_leaf(&self, leaf: usize) -> Hash;
}

impl<V: Version> TreeVersionReadOps for TreeVersion<V>
impl<V: Version> TreeVersionOps for TreeVersion<V>
where
TreeVersionData<V::TreeVersion>: BasicTreeOps,
{
fn update(&self, leaf_index: usize, element: Hash) {
self.get_data().update(leaf_index, element);
}

fn get_root(&self) -> Hash {
self.get_data().get_root()
}
Expand Down Expand Up @@ -444,7 +450,7 @@ where
}

impl<V: Version> TreeVersion<V> {
pub fn get_data(&self) -> MutexGuard<TreeVersionData<V::TreeVersion>> {
fn get_data(&self) -> MutexGuard<TreeVersionData<V::TreeVersion>> {
self.0.lock().expect("no lock poisoning")
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/task_monitor/tasks/create_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::database;
use crate::database::methods::DbMethods as _;
use crate::database::Database;
use crate::identity_tree::{
AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion,
AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionOps, TreeWithNextVersion,
};
use crate::prover::identity::Identity;
use crate::prover::repository::ProverRepository;
Expand Down
2 changes: 1 addition & 1 deletion src/task_monitor/tasks/delete_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::info;
use crate::app::App;
use crate::database::methods::DbMethods;
use crate::database::types::DeletionEntry;
use crate::identity_tree::{Hash, TreeVersionReadOps};
use crate::identity_tree::{Hash, TreeVersionOps};

// Deletion here differs from insert_identites task. This is because two
// different flows are created for both tasks. Due to how our prover works
Expand Down
25 changes: 12 additions & 13 deletions src/task_monitor/tasks/insert_identities.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::sync::Arc;
use std::time::Duration;

use tokio::{pin, sync::{Mutex, Notify}};
use tokio::sync::{Mutex, Notify};
use tokio::time;
use tracing::info;

use crate::app::App;
use crate::database::methods::DbMethods as _;
use crate::database::IsolationLevel;
use crate::identity_tree::{BasicTreeOps, UnprocessedStatus};
use crate::identity_tree::{TreeVersionOps, UnprocessedStatus};

// Insertion here differs from delete_identities task. This is because two
// different flows are created for both tasks. We need to insert identities as
Expand Down Expand Up @@ -42,13 +42,9 @@ pub async fn insert_identities(

let mut tx = app.database.begin_tx(IsolationLevel::ReadCommitted).await?;

// Locks the tree for the duration of the operation
let mut tree = latest_tree.get_data();
pin!(tree);
let next_leaf = latest_tree.next_leaf();
let mut pre_root = latest_tree.get_root();

let mut pre_root = tree.root();

let next_leaf = tree.next_leaf();
let next_db_index = tx.get_next_leaf_index().await?;
assert_eq!(
next_leaf, next_db_index,
Expand All @@ -57,18 +53,21 @@ pub async fn insert_identities(

for (idx, identity) in unprocessed.iter().enumerate() {
let leaf_idx = next_leaf + idx;
tree.update(leaf_idx, identity.commitment);
let root = tree.root();
latest_tree.update(leaf_idx, *identity);
let root = latest_tree.get_root();

tx.insert_pending_identity(leaf_idx, &identity.commitment, &root, &pre_root)
.await?;
tx.insert_pending_identity(leaf_idx, identity, &root, &pre_root)
.await
.expect("Failed to insert identity - tree will be out of sync");

pre_root = root;
}

tx.trim_unprocessed().await?;

tx.commit().await?;
tx.commit()
.await
.expect("Committing insert failed - tree will be out of sync");

// Notify the identity processing task, that there are new identities
wake_up_notify.notify_one();
Expand Down
4 changes: 2 additions & 2 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub mod prelude {
AppConfig, Config, DatabaseConfig, OzDefenderConfig, ProvidersConfig, RelayerConfig,
ServerConfig, TreeConfig, TxSitterConfig,
};
pub use signup_sequencer::identity_tree::{Hash, TreeVersionReadOps};
pub use signup_sequencer::identity_tree::{Hash, TreeVersionOps};
pub use signup_sequencer::prover::ProverType;
pub use signup_sequencer::server;
pub use signup_sequencer::shutdown::Shutdown;
Expand Down Expand Up @@ -76,7 +76,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use reqwest::{Body, Client, Method, Request, RequestBuilder, StatusCode};
use semaphore::poseidon_tree::Proof;
use signup_sequencer::identity_tree::{InclusionProof, TreeState, TreeVersionReadOps};
use signup_sequencer::identity_tree::{InclusionProof, TreeState, TreeVersionOps};
use signup_sequencer::server::data::{
AddBatchSizeRequest, DeletionRequest, InclusionProofRequest, InclusionProofResponse,
InsertCommitmentRequest, RecoveryRequest, RemoveBatchSizeRequest, VerifySemaphoreProofRequest,
Expand Down

0 comments on commit 28b34c3

Please sign in to comment.