Skip to content

Commit

Permalink
Refactor: Config & simplified tasks (#674)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop authored Jan 22, 2024
1 parent 9a78643 commit 904af0a
Show file tree
Hide file tree
Showing 49 changed files with 1,956 additions and 2,565 deletions.
288 changes: 284 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "signup-sequencer"
version = "1.0.1"
version = "2.0.0"
authors = [
"Remco Bloemen <remco@worldcoin.org>",
"Lucas Ege <lucas@worldcoin.org>",
Expand Down Expand Up @@ -38,13 +38,18 @@ cli-batteries = { git = "https://github.com/recmo/cli-batteries", rev = "fc1186d
"otlp",
"datadog",
] }
config = "0.13.4"
ethers = { version = "2.0.10", features = ["ws", "ipc", "openssl", "abigen"] }
ethers-solc = "2.0.10"
eyre = "0.6"
futures = "0.3"
futures-util = { version = "^0.3" }
hex = "0.4.3"
hex-literal = "0.4.1"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = { version = "^0.14.17", features = ["server", "tcp", "http1", "http2"] }
indoc = "2.0.4"
once_cell = "1.8"
oz-api = { path = "crates/oz-api" }
prometheus = "0.13.3" # We need upstream PR#465 to fix #272.
Expand Down Expand Up @@ -74,6 +79,7 @@ tokio = { version = "1.17", features = [
"tracing",
"test-util",
] }
toml = "0.8.8"
tracing = "0.1"
tracing-futures = "0.2"
tx-sitter-client = { path = "crates/tx-sitter-client" }
Expand All @@ -85,14 +91,14 @@ cli-batteries = { git = "https://github.com/recmo/cli-batteries", rev = "fc1186d
"mock-shutdown",
] }
hex = "0.4.3"
hex-literal = "0.4.1"
maplit = "1.0.2"
micro-oz = { path = "crates/micro-oz" }
postgres-docker-utils = { path = "crates/postgres-docker-utils" }
regex = { version = "1.7.1", features = ["std"] }
semaphore = { git = "https://github.com/worldcoin/semaphore-rs", branch = "main", features = [
"depth_20",
] }
similar-asserts = "1.5.0"
test-case = "3.0"
tracing-subscriber = "0.3.11"
tracing-test = "0.2"
Expand Down
144 changes: 47 additions & 97 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,105 +3,61 @@ use std::sync::Arc;
use std::time::Instant;

use chrono::{Duration, Utc};
use clap::Parser;
use ruint::Uint;
use semaphore::poseidon_tree::LazyPoseidonTree;
use semaphore::protocol::verify_proof;
use tracing::{info, instrument, warn};

use crate::config::Config;
use crate::contracts::{IdentityManager, SharedIdentityManager};
use crate::database::{self, Database};
use crate::ethereum::{self, Ethereum};
use crate::database::Database;
use crate::ethereum::Ethereum;
use crate::identity_tree::{
CanonicalTreeBuilder, Hash, InclusionProof, ProcessedStatus, RootItem, Status, TreeState,
TreeUpdate, TreeVersionReadOps, UnprocessedStatus,
};
use crate::prover::map::initialize_prover_maps;
use crate::prover::{self, ProverConfiguration, ProverType, Provers};
use crate::prover::{ProverConfig, ProverType};
use crate::server::data::{
IdentityHistoryEntry, IdentityHistoryEntryKind, IdentityHistoryEntryStatus,
InclusionProofResponse, ListBatchSizesResponse, VerifySemaphoreProofQuery,
VerifySemaphoreProofRequest, VerifySemaphoreProofResponse,
};
use crate::server::error::Error as ServerError;
use crate::task_monitor::TaskMonitor;
use crate::utils::tree_updates::dedup_tree_updates;
use crate::{contracts, task_monitor};

#[derive(Clone, Debug, PartialEq, Parser)]
#[group(skip)]
pub struct Options {
#[clap(flatten)]
pub ethereum: ethereum::Options,

#[clap(flatten)]
pub contracts: contracts::Options,

#[clap(flatten)]
pub database: database::Options,

#[clap(flatten)]
pub batch_provers: prover::Options,

#[clap(flatten)]
pub committer: task_monitor::Options,

/// Block number to start syncing from
#[clap(long, env, default_value = "0")]
pub starting_block: u64,

/// Timeout for the tree lock (seconds).
#[clap(long, env, default_value = "120")]
pub lock_timeout: u64,

/// The depth of the tree prefix that is vectorized.
#[clap(long, env, default_value = "20")]
pub dense_tree_prefix_depth: usize,

/// The number of updates to trigger garbage collection.
#[clap(long, env, default_value = "10000")]
pub tree_gc_threshold: usize,

/// Path and file name to use for mmap file when building dense tree.
#[clap(long, env, default_value = "./dense_tree_mmap")]
pub dense_tree_mmap_file: String,

/// If set will not use cached tree state.
#[clap(long, env)]
pub force_cache_purge: bool,
}

pub struct App {
database: Arc<Database>,
identity_manager: SharedIdentityManager,
identity_committer: Arc<TaskMonitor>,
tree_state: TreeState,
snark_scalar_field: Hash,
pub database: Arc<Database>,
pub identity_manager: SharedIdentityManager,
pub tree_state: TreeState,
pub snark_scalar_field: Hash,
pub config: Config,
}

impl App {
/// # Errors
///
/// Will return `Err` if the internal Ethereum handler errors or if the
/// `options.storage_file` is not accessible.
#[instrument(name = "App::new", level = "debug")]
pub async fn new(options: Options) -> anyhow::Result<Self> {
let ethereum = Ethereum::new(options.ethereum);
let db = Database::new(options.database);
#[instrument(name = "App::new", level = "debug", skip_all)]
pub async fn new(config: Config) -> anyhow::Result<Self> {
let ethereum = Ethereum::new(&config);
let db = Database::new(&config.database);

let (ethereum, db) = tokio::try_join!(ethereum, db)?;

let database = Arc::new(db);
let mut provers: HashSet<ProverConfiguration> = database.get_provers().await?;
let mut provers: HashSet<ProverConfig> = database.get_provers().await?;

let non_inserted_provers = Self::merge_env_provers(options.batch_provers, &mut provers);
let non_inserted_provers =
Self::merge_env_provers(&config.app.provers_urls.0, &mut provers);

database.insert_provers(non_inserted_provers).await?;

let (insertion_prover_map, deletion_prover_map) = initialize_prover_maps(provers)?;

let identity_manager = IdentityManager::new(
options.contracts,
&config,
ethereum.clone(),
insertion_prover_map,
deletion_prover_map,
Expand Down Expand Up @@ -140,12 +96,12 @@ impl App {
&database,
// Poseidon tree depth is one more than the contract's tree depth
identity_manager.tree_depth(),
options.dense_tree_prefix_depth,
options.tree_gc_threshold,
config.tree.dense_tree_prefix_depth,
config.tree.tree_gc_threshold,
identity_manager.initial_leaf_value(),
initial_root_hash,
&options.dense_tree_mmap_file,
options.force_cache_purge,
&config.tree.cache_file,
config.tree.force_cache_purge,
)
.await?;
info!("Tree state initialization took: {:?}", timer.elapsed());
Expand All @@ -162,23 +118,16 @@ impl App {
&database,
// Poseidon tree depth is one more than the contract's tree depth
identity_manager.tree_depth(),
options.dense_tree_prefix_depth,
options.tree_gc_threshold,
config.tree.dense_tree_prefix_depth,
config.tree.tree_gc_threshold,
identity_manager.initial_leaf_value(),
initial_root_hash,
&options.dense_tree_mmap_file,
&config.tree.cache_file,
true,
)
.await?;
}

let identity_committer = Arc::new(TaskMonitor::new(
database.clone(),
identity_manager.clone(),
tree_state.clone(),
&options.committer,
));

// TODO Export the reduced-ness check that this is enabling from the
// `semaphore-rs` library when we bump the version.
let snark_scalar_field = Hash::from_str_radix(
Expand All @@ -187,16 +136,13 @@ impl App {
)
.expect("This should just parse.");

// Process to push new identities to Ethereum
identity_committer.start().await;

// Sync with chain on start up
let app = Self {
database,
identity_manager,
identity_committer,
tree_state,
snark_scalar_field,
config,
};

Ok(app)
Expand Down Expand Up @@ -615,12 +561,14 @@ impl App {
Ok(history)
}

fn merge_env_provers(options: prover::Options, existing_provers: &mut Provers) -> Provers {
let options_set: HashSet<ProverConfiguration> = options
.prover_urls
.0
.into_iter()
.map(|opt| ProverConfiguration {
fn merge_env_provers(
prover_urls: &[ProverConfig],
existing_provers: &mut HashSet<ProverConfig>,
) -> HashSet<ProverConfig> {
let options_set: HashSet<ProverConfig> = prover_urls
.iter()
.cloned()
.map(|opt| ProverConfig {
url: opt.url,
batch_size: opt.batch_size,
timeout_s: opt.timeout_s,
Expand Down Expand Up @@ -780,17 +728,26 @@ impl App {
let processed_root = self.tree_state.get_processed_tree().get_root();
let mined_root = self.tree_state.get_mined_tree().get_root();

tracing::info!("Validating age max_root_age: {max_root_age:?}");

let root = root_state.root;

match root_state.status {
// Pending status implies the batching or latest tree
ProcessedStatus::Pending if latest_root == root || batching_root == root => {
return Ok(())
tracing::warn!("Root is pending - skipping");
return Ok(());
}
// Processed status is hidden - this should never happen
ProcessedStatus::Processed if processed_root == root => return Ok(()),
ProcessedStatus::Processed if processed_root == root => {
tracing::warn!("Root is processed - skipping");
return Ok(());
}
// Processed status is hidden so it could be either processed or mined
ProcessedStatus::Mined if processed_root == root || mined_root == root => return Ok(()),
ProcessedStatus::Mined if processed_root == root || mined_root == root => {
tracing::warn!("Root is mined - skipping");
return Ok(());
}
_ => (),
}

Expand All @@ -808,21 +765,14 @@ impl App {
now - mined_at
};

tracing::warn!("Root age: {root_age:?}");

if root_age > max_root_age {
Err(ServerError::RootTooOld)
} else {
Ok(())
}
}

/// # Errors
///
/// Will return an Error if any of the components cannot be shut down
/// gracefully.
pub async fn shutdown(&self) -> anyhow::Result<()> {
info!("Shutting down identity committer.");
self.identity_committer.shutdown().await
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 904af0a

Please sign in to comment.