Skip to content

Commit

Permalink
chop chop
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Oct 21, 2024
1 parent c5cab66 commit 8409c36
Show file tree
Hide file tree
Showing 11 changed files with 15 additions and 241 deletions.
16 changes: 1 addition & 15 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,7 @@ impl App {
return Err(ServerError::DuplicateCommitment);
}

<<<<<<< HEAD
tx.insert_new_identity(commitment, Utc::now()).await?;
=======
tx.insert_unprocessed_identity(commitment, Utc::now()).await?;
>>>>>>> 428daf1 (WIP)
tx.insert_unprocessed_identity(commitment).await?;

tx.commit().await?;

Expand Down Expand Up @@ -315,16 +311,6 @@ impl App {
return Err(ServerError::InvalidCommitment);
}

if let Some(error_message) = self.database.get_unprocessed_error(commitment).await? {
return Ok(InclusionProof {
root: None,
proof: None,
message: error_message
.or_else(|| Some("identity exists but has not yet been processed".to_string())),
}
.into());
}

let item = self
.database
.get_identity_leaf_index(commitment)
Expand Down
12 changes: 0 additions & 12 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,6 @@ pub struct AppConfig {
#[serde(default = "default::min_batch_deletion_size")]
pub min_batch_deletion_size: usize,

/// The parameter to control the delay between mining a deletion batch and
/// inserting the recovery identities
///
/// The sequencer will insert the recovery identities after
/// max_epoch_duration_seconds + root_history_expiry) seconds have passed
///
/// By default the value is set to 0 so the sequencer will only use
/// root_history_expiry
#[serde(with = "humantime_serde")]
#[serde(default = "default::max_epoch_duration")]
pub max_epoch_duration: Duration,

/// The maximum number of windows to scan for finalization logs
#[serde(default = "default::scanning_window_size")]
pub scanning_window_size: u64,
Expand Down
4 changes: 2 additions & 2 deletions src/database/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use chrono::{DateTime, Utc};
use ruint::aliases::U256;
use sqlx::{Acquire, Executor, Postgres, Row};
use tracing::instrument;
use types::{DeletionEntry, RecoveryEntry};
use types::{DeletionEntry};

use super::types::{LatestDeletionEntry, LatestInsertionEntry};
use crate::database::types::{BatchEntry, BatchEntryData, BatchType};
use crate::database::{types, Error};
use crate::identity_tree::{
Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus,
Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate,
};
use crate::prover::identity::Identity;
use crate::prover::{ProverConfig, ProverType};
Expand Down
44 changes: 6 additions & 38 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ mod test {
use crate::config::DatabaseConfig;
use crate::database::methods::DbMethods;
use crate::database::types::BatchType;
use crate::identity_tree::{Hash, ProcessedStatus, UnprocessedStatus};
use crate::identity_tree::{Hash, ProcessedStatus};
use crate::prover::identity::Identity;
use crate::prover::{ProverConfig, ProverType};
use crate::utils::secret::SecretUrl;
Expand Down Expand Up @@ -302,8 +302,6 @@ mod test {
let identities = mock_identities(10);
let roots = mock_roots(11);

let eligibility_timestamp = Utc::now();

for identity in &identities {
db.insert_unprocessed_identity(*identity).await?;
}
Expand Down Expand Up @@ -466,52 +464,25 @@ mod test {
Ok(())
}

#[tokio::test]
async fn get_eligible_unprocessed_commitments() -> anyhow::Result<()> {
let docker = Cli::default();
let (db, _db_container) = setup_db(&docker).await?;
let commitment_0: Uint<256, 4> = Uint::from(1);
let eligibility_timestamp_0 = Utc::now();

db.insert_unprocessed_identity(commitment_0).await?;

let commitment_1: Uint<256, 4> = Uint::from(2);
let eligibility_timestamp_1 = Utc::now()
.checked_add_days(Days::new(7))
.expect("Could not create eligibility timestamp");

db.insert_unprocessed_identity(commitment_1).await?;

let unprocessed_commitments = db.get_unprocessed_commitments().await?;

assert_eq!(unprocessed_commitments.len(), 1);
assert_eq!(unprocessed_commitments[0], commitment_0);

Ok(())
}

#[tokio::test]
async fn get_unprocessed_commitments() -> anyhow::Result<()> {
let docker = Cli::default();
let (db, _db_container) = setup_db(&docker).await?;

// Insert new identity with a valid eligibility timestamp
// Insert new identity
let commitment_0: Uint<256, 4> = Uint::from(1);
let eligibility_timestamp_0 = Utc::now();
db.insert_unprocessed_identity(commitment_0).await?;

// Insert new identity with eligibility timestamp in the future
// Insert new identity
let commitment_1: Uint<256, 4> = Uint::from(2);
let eligibility_timestamp_1 = Utc::now()
.checked_add_days(Days::new(7))
.expect("Could not create eligibility timestamp");
db.insert_unprocessed_identity(commitment_1).await?;

let unprocessed_commitments = db.get_unprocessed_commitments().await?;

// Assert unprocessed commitments against expected values
assert_eq!(unprocessed_commitments.len(), 1);
assert_eq!(unprocessed_commitments.len(), 2);
assert_eq!(unprocessed_commitments[0], commitment_0);
assert_eq!(unprocessed_commitments[1], commitment_1);

Ok(())
}
Expand Down Expand Up @@ -1027,10 +998,7 @@ mod test {
// When there's no identity
assert!(!db.identity_exists(identities[0]).await?);

// When there's only unprocessed identity
let eligibility_timestamp = Utc::now();

db.insert_unprocessed_identity(identities[0], eligibility_timestamp)
db.insert_unprocessed_identity(identities[0])
.await
.context("Inserting new identity")?;
assert!(db.identity_exists(identities[0]).await?);
Expand Down
10 changes: 0 additions & 10 deletions src/database/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@ use sqlx::{Database, Decode, Encode, Postgres, Type};
use crate::identity_tree::Hash;
use crate::prover::identity::Identity;

#[derive(FromRow)]
pub struct RecoveryEntry {
// existing commitment is used in tests only, but recoveries in general
// are used in production code via the FromRow trait
// so removing this field would break the production code
#[allow(unused)]
pub existing_commitment: Hash,
pub new_commitment: Hash,
}

pub struct LatestInsertionEntry {
pub timestamp: DateTime<Utc>,
}
Expand Down
93 changes: 0 additions & 93 deletions src/identity/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ impl IdentityProcessor for OnChainIdentityProcessor {
self.finalize_mainnet_roots(
processed_tree,
&mainnet_logs,
self.config.app.max_epoch_duration,
)
.await?;

Expand Down Expand Up @@ -407,7 +406,6 @@ impl OnChainIdentityProcessor {
&self,
processed_tree: &TreeVersion<Canonical>,
logs: &[Log],
max_epoch_duration: Duration,
) -> Result<(), anyhow::Error> {
for log in logs {
let Some(event) = Self::raw_log_to_tree_changed(log) else {
Expand All @@ -434,14 +432,6 @@ impl OnChainIdentityProcessor {

info!(?pre_root, ?post_root, ?kind, "Batch mined");

if kind == TreeChangeKind::Deletion {
// NOTE: We must do this before updating the tree
// because we fetch commitments from the processed tree
// before they are deleted
self.update_eligible_recoveries(log, max_epoch_duration)
.await?;
}

let updates_count = processed_tree.apply_updates_up_to(post_root.into());

info!(updates_count, ?pre_root, ?post_root, "Mined tree updated");
Expand Down Expand Up @@ -509,59 +499,6 @@ impl OnChainIdentityProcessor {

roots
}

async fn update_eligible_recoveries(
&self,
log: &Log,
max_epoch_duration: Duration,
) -> anyhow::Result<()> {
let tx_hash = log.transaction_hash.context("Missing tx hash")?;
let commitments = self
.identity_manager
.fetch_deletion_indices_from_tx(tx_hash)
.await
.context("Could not fetch deletion indices from tx")?;

let commitments = self
.database
.get_non_zero_commitments_by_leaf_indexes(commitments.iter().copied())
.await?;
let commitments: Vec<U256> = commitments.into_iter().map(Into::into).collect();

// Fetch the root history expiry time on chain
let root_history_expiry = self.identity_manager.root_history_expiry().await?;

// Use the root history expiry to calculate the eligibility timestamp for the
// new insertion
let root_history_expiry_duration =
chrono::Duration::seconds(root_history_expiry.as_u64() as i64);
let max_epoch_duration = chrono::Duration::from_std(max_epoch_duration)?;

let delay = root_history_expiry_duration + max_epoch_duration;

let eligibility_timestamp = Utc::now() + delay;

let mut tx = self
.database
.begin_tx(IsolationLevel::ReadCommitted)
.await?;

// Check if any deleted commitments correspond with entries in the
// recoveries table and insert the new commitment into the unprocessed
// identities table with the proper eligibility timestamp
let deleted_recoveries = tx.delete_recoveries(commitments).await?;

// For each deletion, if there is a corresponding recovery, insert a new
// identity with the specified eligibility timestamp
for recovery in deleted_recoveries {
tx.insert_unprocessed_identity(recovery.new_commitment, eligibility_timestamp)
.await?;
}

tx.commit().await?;

Ok(())
}
}

pub struct OffChainIdentityProcessor {
Expand All @@ -588,10 +525,6 @@ impl IdentityProcessor for OffChainIdentityProcessor {
};

for batch in batches.iter() {
if batch.batch_type == BatchType::Deletion {
self.update_eligible_recoveries(batch).await?;
}

let mut tx = self
.database
.begin_tx(IsolationLevel::ReadCommitted)
Expand Down Expand Up @@ -638,30 +571,4 @@ impl OffChainIdentityProcessor {

committed_batches.push(batch_entry);
}

async fn update_eligible_recoveries(&self, batch: &BatchEntry) -> anyhow::Result<()> {
let commitments: Vec<U256> = batch.data.identities.iter().map(|v| v.commitment).collect();
let eligibility_timestamp = Utc::now();

let mut tx = self
.database
.begin_tx(IsolationLevel::ReadCommitted)
.await?;

// Check if any deleted commitments correspond with entries in the
// recoveries table and insert the new commitment into the unprocessed
// identities table with the proper eligibility timestamp
let deleted_recoveries = tx.delete_recoveries(commitments).await?;

// For each deletion, if there is a corresponding recovery, insert a new
// identity with the specified eligibility timestamp
for recovery in deleted_recoveries {
tx.insert_unprocessed_identity(recovery.new_commitment, eligibility_timestamp)
.await?;
}

tx.commit().await?;

Ok(())
}
}
2 changes: 1 addition & 1 deletion src/identity_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod status;
pub type PoseidonTree<Version> = LazyMerkleTree<PoseidonHash, Version>;
pub type Hash = <PoseidonHash as Hasher>::Hash;

pub use self::status::{ProcessedStatus, Status, UnknownStatus, UnprocessedStatus};
pub use self::status::{ProcessedStatus, Status, UnknownStatus};

#[derive(Clone, Eq, PartialEq, Hash, Debug, FromRow)]
pub struct TreeUpdate {
Expand Down
43 changes: 2 additions & 41 deletions src/identity_tree/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,12 @@ pub enum ProcessedStatus {
Mined,
}

/// Status of identity commitments which have not yet been included in the tree
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum UnprocessedStatus {
/// Root is unprocessed - i.e. not included in sequencer's
/// in-memory tree.
New,
}

/// A status type visible on the API level - contains both the processed and
/// unprocessed statuses
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "camelCase")]
#[serde(untagged)]
pub enum Status {
Unprocessed(UnprocessedStatus),
pub enum Status { // TODO: Remove
Processed(ProcessedStatus),
}

Expand Down Expand Up @@ -85,41 +75,14 @@ impl FromStr for Status {
type Err = UnknownStatus;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(s) = UnprocessedStatus::from_str(s) {
Ok(Self::Unprocessed(s))
} else if let Ok(s) = ProcessedStatus::from_str(s) {
if let Ok(s) = ProcessedStatus::from_str(s) {
Ok(Self::Processed(s))
} else {
Err(UnknownStatus)
}
}
}

impl FromStr for UnprocessedStatus {
type Err = UnknownStatus;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"new" => Ok(Self::New),
_ => Err(UnknownStatus),
}
}
}

impl From<UnprocessedStatus> for &str {
fn from(scope: UnprocessedStatus) -> Self {
match scope {
UnprocessedStatus::New => "new",
}
}
}

impl From<UnprocessedStatus> for Status {
fn from(status: UnprocessedStatus) -> Self {
Self::Unprocessed(status)
}
}

impl From<ProcessedStatus> for Status {
fn from(status: ProcessedStatus) -> Self {
Self::Processed(status)
Expand All @@ -134,7 +97,6 @@ mod tests {

#[test_case(Status::Processed(ProcessedStatus::Pending) => "pending")]
#[test_case(Status::Processed(ProcessedStatus::Mined) => "mined")]
#[test_case(Status::Unprocessed(UnprocessedStatus::New) => "new")]
fn serialize_status(api_status: Status) -> &'static str {
let s = serde_json::to_string(&api_status).unwrap();

Expand All @@ -146,7 +108,6 @@ mod tests {

#[test_case("pending" => Status::Processed(ProcessedStatus::Pending))]
#[test_case("mined" => Status::Processed(ProcessedStatus::Mined))]
#[test_case("new" => Status::Unprocessed(UnprocessedStatus::New))]
fn deserialize_status(s: &str) -> Status {
// Wrapped because JSON expected `"something"` and not `something`
let wrapped = format!("\"{s}\"");
Expand Down
Loading

0 comments on commit 8409c36

Please sign in to comment.