diff --git a/Dockerfile b/Dockerfile index 206a97e8..d50d28c2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,5 @@ FROM debian:12 as build-env +ARG BUILD_DEV WORKDIR /src @@ -20,11 +21,16 @@ ENV PATH="/root/.cargo/bin:${PATH}" ENV RUSTUP_HOME="/root/.rustup" ENV CARGO_HOME="/root/.cargo" +ENV CARGO_ARGS=${BUILD_DEV:+} +ENV CARGO_ARGS=${CARGO_ARGS:---release} + +RUN echo "CARGO_ARGS: ${CARGO_ARGS}" + # Install the toolchain RUN rustup component add cargo # Build the sequencer -RUN cargo build --release +RUN cargo build ${CARGO_ARGS} # cc variant because we need libgcc and others FROM gcr.io/distroless/cc-debian12:nonroot @@ -36,7 +42,10 @@ LABEL prometheus.io/scrape="true" LABEL prometheus.io/port="9998" LABEL prometheus.io/path="/metrics" +ENV BIN_PATH=${BUILD_DEV:+/src/target/debug/signup-sequencer} +ENV BIN_PATH=${BIN_PATH:-/src/target/release/signup-sequencer} + # Copy the sequencer binary -COPY --from=build-env --chown=0:10001 --chmod=454 /src/target/release/signup-sequencer /bin/signup-sequencer +COPY --from=build-env --chown=0:10001 --chmod=454 ${BIN_PATH} /bin/signup-sequencer ENTRYPOINT [ "/bin/signup-sequencer" ] diff --git a/e2e_tests/compose.yml b/e2e_tests/compose.yml new file mode 100644 index 00000000..89d1f3ee --- /dev/null +++ b/e2e_tests/compose.yml @@ -0,0 +1,206 @@ +version: "3" +services: + chain: + container_name: chain + image: ghcr.io/foundry-rs/foundry + platform: linux/amd64 + ports: + - "8545:8545" + command: [ "anvil --host 0.0.0.0 --chain-id 31337 --block-time 30 --base-fee 0 --gas-limit 0 --gas-price 0 --fork-url https://eth-sepolia.g.alchemy.com/v2/Hkj3vTy6ee49NbI4Imhe6r5mRM1vkR10@5091094" ] + tx-sitter-db: + container_name: tx-sitter-db + image: postgres:latest + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=tx-sitter + ports: + - "5460:5432" + volumes: + - tx_sitter_db_data:/var/lib/postgresql/data + sequencer-db: + container_name: sequencer-db + image: postgres:latest + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=sequencer + ports: + - "5461:5432" + volumes: + - sequencer_db_data:/var/lib/postgresql/data + tx-sitter: + container_name: tx-sitter + image: tx-sitter-monolith + depends_on: + - tx-sitter-db + - chain + restart: always + ports: + - "3000:3000" + environment: + - TX_SITTER__SERVICE__ESCALATION_INTERVAL=3s + - TX_SITTER__DATABASE__KIND=connection_string + - TX_SITTER__DATABASE__CONNECTION_STRING=postgres://postgres:postgres@tx-sitter-db:5432/tx-sitter?sslmode=disable + - TX_SITTER__KEYS__KIND=local + - TX_SITTER__SERVICE__PREDEFINED__NETWORK__CHAIN_ID=31337 + - TX_SITTER__SERVICE__PREDEFINED__NETWORK__NAME=Anvil + - TX_SITTER__SERVICE__PREDEFINED__NETWORK__HTTP_RPC=http://chain:8545 + - TX_SITTER__SERVICE__PREDEFINED__NETWORK__WS_RPC=ws://chain:8545 + - TX_SITTER__SERVICE__PREDEFINED__RELAYER__ID=1b908a34-5dc1-4d2d-a146-5eb46e975830 + - TX_SITTER__SERVICE__PREDEFINED__RELAYER__NAME=Relayer + - TX_SITTER__SERVICE__PREDEFINED__RELAYER__CHAIN_ID=31337 + - TX_SITTER__SERVICE__PREDEFINED__RELAYER__KEY_ID=d10607662a85424f02a33fb1e6d095bd0ac7154396ff09762e41f82ff2233aaa + - TX_SITTER__SERVICE__PREDEFINED__RELAYER__API_KEY=G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH + - TX_SITTER__SERVER__HOST=0.0.0.0:3000 + - TX_SITTER__SERVER__DISABLE_AUTH=true + - RUST_LOG=info + semaphore-insertion: + container_name: semaphore-insertion + image: semaphore-mtb + hostname: semaphore-insertion + restart: always + command: [ "start", "--keys-file", "/mtb/keys", "--prover-address", "0.0.0.0:3001", "--mode", "insertion" ] + volumes: + - ./keys/insertion_b10t30.ps:/mtb/keys + environment: + BATCH_TIMEOUT_SECONDS: 1 + semaphore-deletion: + container_name: semaphore-deletion + image: semaphore-mtb + hostname: semaphore-deletion + restart: always + command: [ "start", "--keys-file", "/mtb/keys", "--prover-address", "0.0.0.0:3001", "--mode", "deletion" ] + volumes: + - ./keys/deletion_b10t30.ps:/mtb/keys + environment: + BATCH_DELETION_TIMEOUT_SECONDS: 1 + signup-sequencer-0: + container_name: signup-sequencer-0 + image: signup-sequencer + build: + context: ./../ + args: + BUILD_DEV: 1 + depends_on: + - sequencer-db + - chain + - semaphore-insertion + - semaphore-deletion + restart: always + ports: + - "9080:8080" + environment: + - SEQ__TREE__TREE_DEPTH=30 + - SEQ__TREE__DENSE_TREE_PREFIX_DEPTH=22 + - SEQ__TREE__TREE_GC_THRESHOLD=10000000 + - SEQ__TREE__CACHE_FILE=./cache_file + - SEQ__SERVER__ADDRESS=0.0.0.0:8080 + - SEQ__NETWORK__IDENTITY_MANAGER_ADDRESS=0x48483748eb0446A16cAE79141D0688e3F624Cb73 + - SEQ__RELAYER__KIND=tx_sitter + - SEQ__RELAYER__TX_SITTER_URL=http://tx-sitter:3000/1/api/G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH + - SEQ__RELAYER__TX_SITTER_ADDRESS=0x1d7ffed610cc4cdC097ecDc835Ae5FEE93C9e3Da + - SEQ__RELAYER__TX_SITTER_GAS_LIMIT=2000000 + - SEQ__PROVIDERS__PRIMARY_NETWORK_PROVIDER=http://chain:8545 + - 'SEQ__APP__PROVERS_URLS=[{"url": "http://semaphore-insertion:3001", "prover_type": "insertion", "batch_size": 10,"timeout_s": 30}, {"url": "http://semaphore-deletion:3001", "prover_type": "deletion", "batch_size": 10,"timeout_s": 30}]' + - SEQ__DATABASE__DATABASE=postgres://postgres:postgres@sequencer-db:5432/sequencer?sslmode=disable + - SEQ__APP__BATCH_INSERTION_TIMEOUT=30s + - SEQ__APP__BATCH_DELETION_TIMEOUT_SECONDS=1s + signup-sequencer-1: + container_name: signup-sequencer-1 + image: signup-sequencer + build: + context: ./../ + args: + BUILD_DEV: 1 + depends_on: + - sequencer-db + - chain + - semaphore-insertion + - semaphore-deletion + restart: always + ports: + - "9081:8080" + environment: + - SEQ__TREE__TREE_DEPTH=30 + - SEQ__TREE__DENSE_TREE_PREFIX_DEPTH=22 + - SEQ__TREE__TREE_GC_THRESHOLD=10000000 + - SEQ__TREE__CACHE_FILE=./cache_file + - SEQ__SERVER__ADDRESS=0.0.0.0:8080 + - SEQ__NETWORK__IDENTITY_MANAGER_ADDRESS=0x48483748eb0446A16cAE79141D0688e3F624Cb73 + - SEQ__RELAYER__KIND=tx_sitter + - SEQ__RELAYER__TX_SITTER_URL=http://tx-sitter:3000/1/api/G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH + - SEQ__RELAYER__TX_SITTER_ADDRESS=0x1d7ffed610cc4cdC097ecDc835Ae5FEE93C9e3Da + - SEQ__RELAYER__TX_SITTER_GAS_LIMIT=2000000 + - SEQ__PROVIDERS__PRIMARY_NETWORK_PROVIDER=http://chain:8545 + - 'SEQ__APP__PROVERS_URLS=[{"url": "http://semaphore-insertion:3001", "prover_type": "insertion", "batch_size": 10,"timeout_s": 30}, {"url": "http://semaphore-deletion:3001", "prover_type": "deletion", "batch_size": 10,"timeout_s": 30}]' + - SEQ__DATABASE__DATABASE=postgres://postgres:postgres@sequencer-db:5432/sequencer?sslmode=disable + - SEQ__APP__BATCH_INSERTION_TIMEOUT=30s + - SEQ__APP__BATCH_DELETION_TIMEOUT_SECONDS=1s + signup-sequencer-2: + container_name: signup-sequencer-2 + image: signup-sequencer + build: + context: ./../ + args: + BUILD_DEV: 1 + depends_on: + - sequencer-db + - chain + - semaphore-insertion + - semaphore-deletion + restart: always + ports: + - "9082:8080" + environment: + - SEQ__TREE__TREE_DEPTH=30 + - SEQ__TREE__DENSE_TREE_PREFIX_DEPTH=22 + - SEQ__TREE__TREE_GC_THRESHOLD=10000000 + - SEQ__TREE__CACHE_FILE=./cache_file + - SEQ__SERVER__ADDRESS=0.0.0.0:8080 + - SEQ__NETWORK__IDENTITY_MANAGER_ADDRESS=0x48483748eb0446A16cAE79141D0688e3F624Cb73 + - SEQ__RELAYER__KIND=tx_sitter + - SEQ__RELAYER__TX_SITTER_URL=http://tx-sitter:3000/1/api/G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH + - SEQ__RELAYER__TX_SITTER_ADDRESS=0x1d7ffed610cc4cdC097ecDc835Ae5FEE93C9e3Da + - SEQ__RELAYER__TX_SITTER_GAS_LIMIT=2000000 + - SEQ__PROVIDERS__PRIMARY_NETWORK_PROVIDER=http://chain:8545 + - 'SEQ__APP__PROVERS_URLS=[{"url": "http://semaphore-insertion:3001", "prover_type": "insertion", "batch_size": 10,"timeout_s": 30}, {"url": "http://semaphore-deletion:3001", "prover_type": "deletion", "batch_size": 10,"timeout_s": 30}]' + - SEQ__DATABASE__DATABASE=postgres://postgres:postgres@sequencer-db:5432/sequencer?sslmode=disable + - SEQ__APP__BATCH_INSERTION_TIMEOUT=30s + - SEQ__APP__BATCH_DELETION_TIMEOUT_SECONDS=1s + signup-sequencer-3: + container_name: signup-sequencer-3 + image: signup-sequencer + build: + context: ./../ + args: + BUILD_DEV: 1 + depends_on: + - sequencer-db + - chain + - semaphore-insertion + - semaphore-deletion + restart: always + ports: + - "9083:8080" + environment: + - SEQ__TREE__TREE_DEPTH=30 + - SEQ__TREE__DENSE_TREE_PREFIX_DEPTH=22 + - SEQ__TREE__TREE_GC_THRESHOLD=10000000 + - SEQ__TREE__CACHE_FILE=./cache_file + - SEQ__SERVER__ADDRESS=0.0.0.0:8080 + - SEQ__NETWORK__IDENTITY_MANAGER_ADDRESS=0x48483748eb0446A16cAE79141D0688e3F624Cb73 + - SEQ__RELAYER__KIND=tx_sitter + - SEQ__RELAYER__TX_SITTER_URL=http://tx-sitter:3000/1/api/G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH + - SEQ__RELAYER__TX_SITTER_ADDRESS=0x1d7ffed610cc4cdC097ecDc835Ae5FEE93C9e3Da + - SEQ__RELAYER__TX_SITTER_GAS_LIMIT=2000000 + - SEQ__PROVIDERS__PRIMARY_NETWORK_PROVIDER=http://chain:8545 + - 'SEQ__APP__PROVERS_URLS=[{"url": "http://semaphore-insertion:3001", "prover_type": "insertion", "batch_size": 10,"timeout_s": 30}, {"url": "http://semaphore-deletion:3001", "prover_type": "deletion", "batch_size": 10,"timeout_s": 30}]' + - SEQ__DATABASE__DATABASE=postgres://postgres:postgres@sequencer-db:5432/sequencer?sslmode=disable + - SEQ__APP__BATCH_INSERTION_TIMEOUT=30s + - SEQ__APP__BATCH_DELETION_TIMEOUT_SECONDS=1s +volumes: + tx_sitter_db_data: + driver: local + sequencer_db_data: + driver: local diff --git a/e2e_tests/create_identities.sh b/e2e_tests/create_identities.sh new file mode 100755 index 00000000..43730be1 --- /dev/null +++ b/e2e_tests/create_identities.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +NUMBER=${1:-1} +SLEEP=${2:-0} + +for run in $(seq $NUMBER); do + echo "running"; + curl -X POST -H "Content-Type: application/json" -d "{\"identityCommitment\":\"0x$(openssl rand -hex 16)\"}" localhost:9080/insertIdentity -vv; + sleep $SLEEP +done \ No newline at end of file diff --git a/e2e_tests/keys/.gitignore b/e2e_tests/keys/.gitignore new file mode 100644 index 00000000..d6b7ef32 --- /dev/null +++ b/e2e_tests/keys/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/schemas/database/013_batches_and_transactions.sql b/schemas/database/013_batches_and_transactions.sql new file mode 100644 index 00000000..e9ce872e --- /dev/null +++ b/schemas/database/013_batches_and_transactions.sql @@ -0,0 +1,25 @@ +-- Create ENUM for prover type +CREATE TYPE batch_type_enum AS ENUM ('Insertion', 'Deletion'); + +CREATE TABLE batches +( + next_root BYTEA NOT NULL UNIQUE PRIMARY KEY, + prev_root BYTEA UNIQUE, + created_at TIMESTAMPTZ NOT NULL, + batch_type batch_type_enum NOT NULL, + commitments BYTEA[] NOT NULL, + leaf_indexes BIGINT[] NOT NULL CHECK (array_length(leaf_indexes, 1) = array_length(commitments, 1)), + + FOREIGN KEY (prev_root) REFERENCES batches (next_root) +); + +CREATE UNIQUE INDEX i_single_null_prev_root ON batches ((batches.prev_root IS NULL)) WHERE batches.prev_root IS NULL; + +CREATE TABLE transactions +( + transaction_id VARCHAR(256) NOT NULL UNIQUE PRIMARY KEY, + batch_next_root BYTEA NOT NULL UNIQUE, + created_at TIMESTAMPTZ NOT NULL, + + FOREIGN KEY (batch_next_root) REFERENCES batches (next_root) +); \ No newline at end of file diff --git a/src/app.rs b/src/app.rs index be1fcd15..44a223e3 100644 --- a/src/app.rs +++ b/src/app.rs @@ -259,8 +259,7 @@ impl App { processed_builder.update(&processed_item); } - let (processed, batching_builder) = processed_builder.seal_and_continue(); - let (batching, mut latest_builder) = batching_builder.seal_and_continue(); + let (processed, mut latest_builder) = processed_builder.seal_and_continue(); let pending_items = self .database .get_commitments_by_status(ProcessedStatus::Pending) @@ -269,7 +268,7 @@ impl App { latest_builder.update(&update); } let latest = latest_builder.seal(); - Ok(Some(TreeState::new(mined, processed, batching, latest))) + Ok(Some(TreeState::new(mined, processed, latest))) } pub fn tree_state(&self) -> anyhow::Result<&TreeState> { @@ -335,8 +334,7 @@ impl App { }) .await?; - let (processed, batching_builder) = processed_builder.seal_and_continue(); - let (batching, mut latest_builder) = batching_builder.seal_and_continue(); + let (processed, mut latest_builder) = processed_builder.seal_and_continue(); let pending_items = self .database @@ -355,7 +353,7 @@ impl App { let latest = latest_builder.seal(); - Ok(TreeState::new(mined, processed, batching, latest)) + Ok(TreeState::new(mined, processed, latest)) } /// Queues an insert into the merkle tree. @@ -542,13 +540,15 @@ impl App { // A pending identity can be present in the batching tree and therefore status // should be set to Batched IdentityHistoryEntryStatus::Pending => { - if let Some(leaf_index) = entry.leaf_index { - if self.tree_state()?.get_batching_tree().get_leaf(leaf_index) - == entry.commitment - { - status = IdentityHistoryEntryStatus::Batched; - } - } + // todo(piotrh): what to do with it? + // if let Some(leaf_index) = entry.leaf_index { + // if self.tree_state()?.get_batching_tree().get_leaf(leaf_index) + // == entry.commitment + // { + // status = IdentityHistoryEntryStatus::Batched; + // } + // } + () } IdentityHistoryEntryStatus::Buffered if entry.held_back => { status = IdentityHistoryEntryStatus::Queued; @@ -732,7 +732,6 @@ impl App { ) -> Result<(), ServerError> { let tree_state = self.tree_state()?; let latest_root = tree_state.get_latest_tree().get_root(); - let batching_root = tree_state.get_batching_tree().get_root(); let processed_root = tree_state.get_processed_tree().get_root(); let mined_root = tree_state.get_mined_tree().get_root(); @@ -742,7 +741,7 @@ impl App { match root_state.status { // Pending status implies the batching or latest tree - ProcessedStatus::Pending if latest_root == root || batching_root == root => { + ProcessedStatus::Pending if latest_root == root => { tracing::warn!("Root is pending - skipping"); return Ok(()); } diff --git a/src/database/mod.rs b/src/database/mod.rs index 96844943..9e73c425 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -19,6 +19,8 @@ use tracing::{error, info, instrument, warn}; use self::types::{CommitmentHistoryEntry, DeletionEntry, LatestDeletionEntry, RecoveryEntry}; use crate::config::DatabaseConfig; +use crate::database::types::{BatchEntry, BatchType, Commitments, LeafIndexes, TransactionEntry}; +use crate::ethereum::write::TransactionId; use crate::identity_tree::{ Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, }; @@ -679,16 +681,10 @@ pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { ) } - async fn delete_recoveries, T: Into>( + async fn delete_recoveries( self, - prev_commits: I, + prev_commits: &Commitments, ) -> Result, Error> { - // TODO: upstream PgHasArrayType impl to ruint - let prev_commits = prev_commits - .into_iter() - .map(|c| c.into().to_be_bytes()) - .collect::>(); - let res = sqlx::query_as::<_, RecoveryEntry>( r#" DELETE @@ -697,7 +693,7 @@ pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { RETURNING * "#, ) - .bind(&prev_commits) + .bind(prev_commits) .fetch_all(self) .await?; @@ -740,12 +736,7 @@ pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { } /// Remove a list of entries from the deletions table - async fn remove_deletions(self, commitments: &[Hash]) -> Result<(), Error> { - let commitments = commitments - .iter() - .map(|c| c.to_be_bytes()) - .collect::>(); - + async fn remove_deletions(self, commitments: &Commitments) -> Result<(), Error> { sqlx::query("DELETE FROM deletions WHERE commitment = Any($1)") .bind(commitments) .execute(self) @@ -757,6 +748,7 @@ pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { async fn get_eligible_unprocessed_commitments( self, status: UnprocessedStatus, + limit: usize, ) -> Result, Error> { let query = sqlx::query( r#" @@ -766,7 +758,8 @@ pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { "#, ) .bind(<&str>::from(status)) - .bind(MAX_UNPROCESSED_FETCH_COUNT); + .bind(limit as i64); + // .bind(MAX_UNPROCESSED_FETCH_COUNT); // todo(piotrh): clean let result = self.fetch_all(query).await?; @@ -821,8 +814,8 @@ pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { async fn identity_exists(self, commitment: Hash) -> Result { Ok(sqlx::query( r#" - select - EXISTS (select commitment from unprocessed_identities where commitment = $1) OR + select + EXISTS (select commitment from unprocessed_identities where commitment = $1) OR EXISTS (select commitment from identities where commitment = $1); "#, ) @@ -840,6 +833,135 @@ pub trait DatabaseExt<'a>: Executor<'a, Database = Postgres> { let row_unprocessed = self.fetch_one(query_queued_deletion).await?; Ok(row_unprocessed.get::(0)) } + + async fn insert_new_batch_head( + self, + next_root: &Hash, + batch_type: BatchType, + commitments: &Commitments, + leaf_indexes: &LeafIndexes, + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO batches( + next_root, + prev_root, + created_at, + batch_type, + commitments, + leaf_indexes + ) VALUES ($1, NULL, CURRENT_TIMESTAMP, $2, $3, $4) + "#, + ) + .bind(next_root) + .bind(batch_type) + .bind(commitments) + .bind(leaf_indexes); + + self.execute(query).await?; + Ok(()) + } + + async fn insert_new_batch( + self, + next_root: &Hash, + prev_root: &Hash, + batch_type: BatchType, + commitments: &Commitments, + leaf_indexes: &LeafIndexes, + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO batches( + next_root, + prev_root, + created_at, + batch_type, + commitments, + leaf_indexes + ) VALUES ($1, $2, CURRENT_TIMESTAMP, $3, $4, $5) + "#, + ) + .bind(next_root) + .bind(prev_root) + .bind(batch_type) + .bind(commitments) + .bind(leaf_indexes); + + self.execute(query).await?; + Ok(()) + } + + async fn get_next_batch(self, prev_root: &Hash) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT * FROM batches WHERE prev_root = $1 + "#, + ) + .bind(prev_root) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_batch_head(self) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT * FROM batches WHERE prev_root IS NULL + "#, + ) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn is_root_in_batch_chain(self, root: &Hash) -> Result { + let query = sqlx::query( + r#"SELECT exists(SELECT 1 FROM batches where prev_root = $1 OR next_root = $1)"#, + ) + .bind(root); + let row_unprocessed = self.fetch_one(query).await?; + Ok(row_unprocessed.get::(0)) + } + + async fn insert_new_transaction( + self, + transaction_id: &String, + batch_next_root: &Hash, + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO transactions( + transaction_id, + batch_next_root, + created_at + ) VALUES ($1, $2, CURRENT_TIMESTAMP) + "#, + ) + .bind(transaction_id) + .bind(batch_next_root); + + self.execute(query).await?; + Ok(()) + } + + async fn get_transaction_for_batch( + self, + next_root: &Hash, + ) -> Result, Error> { + let res = sqlx::query_as::<_, TransactionEntry>( + r#" + SELECT * FROM transactions WHERE batch_next_root = $1 + "#, + ) + .bind(next_root) + .fetch_optional(self) + .await?; + + Ok(res) + } } #[derive(Debug, Error)] @@ -867,6 +989,7 @@ mod test { use super::Database; use crate::config::DatabaseConfig; + use crate::database::types::{BatchType, Commitments, LeafIndexes}; use crate::database::DatabaseExt as _; use crate::identity_tree::{Hash, ProcessedStatus, Status, UnprocessedStatus}; use crate::prover::{ProverConfig, ProverType}; @@ -894,7 +1017,7 @@ mod test { // TODO: we should probably consolidate all tests that propagate errors to // TODO: either use anyhow or eyre - async fn setup_db<'a>(docker: &'a Cli) -> anyhow::Result<(Database, DockerContainer)> { + async fn setup_db(docker: &Cli) -> anyhow::Result<(Database, DockerContainer)> { let db_container = postgres_docker_utils::setup(docker).await?; let url = format!( "postgres://postgres:postgres@{}/database", @@ -969,7 +1092,7 @@ mod test { assert_eq!(commit.0, UnprocessedStatus::New); let identity_count = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) + .get_eligible_unprocessed_commitments(UnprocessedStatus::New, 100) .await? .len(); @@ -1153,7 +1276,7 @@ mod test { .await?; let unprocessed_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) + .get_eligible_unprocessed_commitments(UnprocessedStatus::New, 100) .await?; assert_eq!(unprocessed_commitments.len(), 1); @@ -1187,7 +1310,7 @@ mod test { .await?; let unprocessed_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) + .get_eligible_unprocessed_commitments(UnprocessedStatus::New, 100) .await?; // Assert unprocessed commitments against expected values @@ -1232,12 +1355,12 @@ mod test { .await?; let commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) + .get_eligible_unprocessed_commitments(UnprocessedStatus::New, 100) .await?; assert_eq!(commitments.len(), 1); let eligible_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) + .get_eligible_unprocessed_commitments(UnprocessedStatus::New, 100) .await?; assert_eq!(eligible_commitments.len(), 1); @@ -1252,7 +1375,7 @@ mod test { .await?; let eligible_commitments = db - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) + .get_eligible_unprocessed_commitments(UnprocessedStatus::New, 100) .await?; assert_eq!(eligible_commitments.len(), 1); @@ -1324,7 +1447,7 @@ mod test { } let deleted_recoveries = db - .delete_recoveries(old_identities[0..2].iter().cloned()) + .delete_recoveries(&Commitments(old_identities[0..2].to_vec())) .await?; assert_eq!(deleted_recoveries.len(), 2); @@ -1841,7 +1964,8 @@ mod test { .context("Inserting new identity")?; // Remove identities 0 to 2 - db.remove_deletions(&identities[0..=2]).await?; + db.remove_deletions(&Commitments(identities[0..=2].to_vec())) + .await?; let deletions = db.get_deletions().await?; assert_eq!(deletions.len(), 1); @@ -2038,4 +2162,144 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_insert_batch() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities = mock_identities(10); + let identities1: Vec = identities.iter().take(4).map(|v| v.clone()).collect(); + let leaf_indexes_1 = (0..4).collect(); + let identities2: Vec = identities + .iter() + .skip(4) + .take(6) + .map(|v| v.clone()) + .collect(); + let leaf_indexes_2 = (4..10).collect(); + let roots = mock_roots(2); + + db.insert_new_batch_head( + &roots[0], + BatchType::Insertion, + &Commitments(identities1), + &LeafIndexes(leaf_indexes_1), + ) + .await?; + db.insert_new_batch( + &roots[1], + &roots[0], + BatchType::Insertion, + &Commitments(identities2), + &LeafIndexes(leaf_indexes_2), + ) + .await?; + + Ok(()) + } + + #[tokio::test] + async fn test_get_next_batch() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities = mock_identities(10); + let identities1: Vec = identities.iter().take(4).map(|v| v.clone()).collect(); + let leaf_indexes_1 = (0..4).collect(); + let identities2: Vec = identities + .iter() + .skip(4) + .take(6) + .map(|v| v.clone()) + .collect(); + let leaf_indexes_2 = (4..10).collect(); + let roots = mock_roots(2); + + db.insert_new_batch_head( + &roots[0], + BatchType::Insertion, + &Commitments(identities1), + &LeafIndexes(leaf_indexes_1), + ) + .await?; + db.insert_new_batch( + &roots[1], + &roots[0], + BatchType::Insertion, + &Commitments(identities2.clone()), + &LeafIndexes(leaf_indexes_2), + ) + .await?; + + let next_batch = db.get_next_batch(&roots[0]).await?; + + assert!(next_batch.is_some()); + + let next_batch = next_batch.unwrap(); + + assert_eq!(next_batch.prev_root.unwrap(), roots[0]); + assert_eq!(next_batch.next_root, roots[1]); + assert_eq!(next_batch.commitments, identities2.into()); + + let next_batch = db.get_next_batch(&roots[1]).await?; + + assert!(next_batch.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_get_batch_head() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities: Vec = Vec::default(); + let leaf_indexes = Vec::default(); + let roots = mock_roots(1); + + let batch_head = db.get_batch_head().await?; + + assert!(batch_head.is_none()); + + db.insert_new_batch_head( + &roots[0], + BatchType::Insertion, + &Commitments(identities.clone()), + &LeafIndexes(leaf_indexes.clone()), + ) + .await?; + + let batch_head = db.get_batch_head().await?; + + assert!(batch_head.is_some()); + let batch_head = batch_head.unwrap(); + + assert_eq!(batch_head.prev_root, None); + assert_eq!(batch_head.next_root, roots[0]); + assert_eq!(batch_head.commitments, identities.into()); + assert_eq!(batch_head.leaf_indexes, leaf_indexes.into()); + + Ok(()) + } + + #[tokio::test] + async fn test_insert_transaction() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities = mock_identities(5); + let leaf_indexes_1 = (0..5).collect(); + let roots = mock_roots(1); + let transaction_id = String::from("173bcbfd-e1d9-40e2-ba10-fc1dfbf742c9"); + + db.insert_new_batch_head( + &roots[0], + BatchType::Insertion, + &Commitments(identities), + &LeafIndexes(leaf_indexes_1), + ) + .await?; + + db.insert_new_transaction(&transaction_id, &roots[0]) + .await?; + + Ok(()) + } } diff --git a/src/database/types.rs b/src/database/types.rs index 71b596b2..3f043384 100644 --- a/src/database/types.rs +++ b/src/database/types.rs @@ -1,5 +1,10 @@ use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::database::{HasArguments, HasValueRef}; +use sqlx::encode::IsNull; +use sqlx::error::BoxDynError; use sqlx::prelude::FromRow; +use sqlx::{Decode, Encode, Postgres, Type}; use crate::identity_tree::{Hash, Status, UnprocessedStatus}; @@ -37,3 +42,121 @@ pub struct CommitmentHistoryEntry { pub held_back: bool, pub status: Status, } + +#[derive(Debug, Copy, Clone, sqlx::Type, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +#[sqlx(type_name = "batch_type_enum", rename_all = "PascalCase")] +pub enum BatchType { + #[default] + Insertion, + Deletion, +} + +impl std::fmt::Display for BatchType { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + BatchType::Insertion => write!(f, "insertion"), + BatchType::Deletion => write!(f, "deletion"), + } + } +} + +#[derive(Debug, Clone, FromRow)] +pub struct BatchEntry { + pub next_root: Hash, + // In general prev_root is present all the time except the first row (head of the batches + // chain) + pub prev_root: Option, + pub created_at: DateTime, + pub batch_type: BatchType, + pub commitments: Commitments, + pub leaf_indexes: LeafIndexes, +} + +#[derive(Debug, Clone, FromRow)] +pub struct TransactionEntry { + pub batch_next_root: Hash, + pub transaction_id: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Commitments(pub Vec); + +impl Encode<'_, Postgres> for Commitments { + fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> IsNull { + let commitments = &self + .0 + .iter() + .map(|c| c.to_be_bytes()) // Why be not le? + .collect::>(); + + <&Vec<[u8; 32]> as Encode>::encode(commitments, buf) + } +} + +impl Decode<'_, Postgres> for Commitments { + fn decode(value: >::ValueRef) -> Result { + let value = as Decode>::decode(value)?; + + let res = value.iter().map(|&v| Hash::from_be_bytes(v)).collect(); + + Ok(Commitments(res)) + } +} + +impl Type for Commitments { + fn type_info() -> ::TypeInfo { + <&Vec<&[u8]> as Type>::type_info() + } + + fn compatible(ty: &::TypeInfo) -> bool { + <&Vec<&[u8]> as Type>::compatible(ty) + } +} + +impl From> for Commitments { + fn from(value: Vec) -> Self { + Commitments(value) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LeafIndexes(pub Vec); + +impl Encode<'_, Postgres> for LeafIndexes { + fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> IsNull { + let commitments = &self + .0 + .iter() + .map(|&c| c as i64) // Why be not le? + .collect(); + + <&Vec as Encode>::encode(commitments, buf) + } +} + +impl Decode<'_, Postgres> for LeafIndexes { + fn decode(value: >::ValueRef) -> Result { + let value = as Decode>::decode(value)?; + + let res = value.iter().map(|&v| v as usize).collect(); + + Ok(LeafIndexes(res)) + } +} + +impl Type for LeafIndexes { + fn type_info() -> ::TypeInfo { + <&Vec as Type>::type_info() + } + + fn compatible(ty: &::TypeInfo) -> bool { + <&Vec as Type>::compatible(ty) + } +} + +impl From> for LeafIndexes { + fn from(value: Vec) -> Self { + LeafIndexes(value) + } +} diff --git a/src/identity_tree.rs b/src/identity_tree.rs index 85dd3c35..8d6ab74c 100644 --- a/src/identity_tree.rs +++ b/src/identity_tree.rs @@ -1,4 +1,5 @@ use std::cmp::min; +use std::ops::Deref; use std::sync::{Arc, Mutex, MutexGuard}; use chrono::Utc; @@ -10,13 +11,13 @@ use serde::Serialize; use sqlx::prelude::FromRow; use tracing::{info, warn}; +pub use self::status::{ProcessedStatus, Status, UnknownStatus, UnprocessedStatus}; + mod status; pub type PoseidonTree = LazyMerkleTree; pub type Hash = ::Hash; -pub use self::status::{ProcessedStatus, Status, UnknownStatus, UnprocessedStatus}; - #[derive(Clone, Eq, PartialEq, Hash, Debug, FromRow)] pub struct TreeUpdate { #[sqlx(try_from = "i64")] @@ -458,6 +459,52 @@ impl TreeVersion { output } + + pub fn get_depth(&self) -> usize { + let data = self.get_data(); + data.tree.depth() + } + + #[must_use] + pub fn append_many_as_derived( + &self, + identities: &[Hash], + ) -> (Vec<(Hash, Proof, usize)>, LazyMerkleTree) { + let data = self.get_data(); + let mut tree_copy = data.tree.clone(); + let next_leaf = data.next_leaf; + + let mut output = Vec::with_capacity(identities.len()); + + for (idx, identity) in identities.iter().enumerate() { + let leaf_index = next_leaf + idx; + tree_copy = tree_copy.update(leaf_index, identity); + let root = tree_copy.root(); + let proof = tree_copy.proof(leaf_index); + + output.push((root, proof, leaf_index)); + } + + (output, tree_copy) + } + + #[must_use] + pub fn delete_many_as_derived(&self, leaf_indices: &[usize]) -> Vec<(Hash, Proof)> { + let data = self.get_data(); + let mut tree_copy = data.tree.clone(); + + let mut output = Vec::with_capacity(leaf_indices.len()); + + for leaf_index in leaf_indices { + tree_copy = tree_copy.update(*leaf_index, (&Hash::ZERO).into()); + let root = tree_copy.root(); + let proof = tree_copy.proof(*leaf_index); + + output.push((root, proof)); + } + + output + } } impl TreeVersion @@ -502,7 +549,6 @@ where pub struct TreeState { mined: TreeVersion, processed: TreeVersion, - batching: TreeVersion, latest: TreeVersion, } @@ -511,13 +557,11 @@ impl TreeState { pub const fn new( mined: TreeVersion, processed: TreeVersion, - batching: TreeVersion, latest: TreeVersion, ) -> Self { Self { mined, processed, - batching, latest, } } @@ -549,15 +593,6 @@ impl TreeState { &self.processed } - #[must_use] - pub fn get_batching_tree(&self) -> TreeVersion { - self.batching.clone() - } - - pub fn batching_tree(&self) -> &TreeVersion { - &self.batching - } - #[must_use] pub fn get_proof_for(&self, item: &TreeItem) -> (Field, InclusionProof) { let (leaf, root, proof) = match item.status { @@ -749,7 +784,6 @@ impl DerivedTreeBuilder

{ #[cfg(test)] mod tests { - use super::{CanonicalTreeBuilder, Hash, TreeWithNextVersion}; #[test] diff --git a/src/task_monitor/tasks/delete_identities.rs b/src/task_monitor/tasks/delete_identities.rs index 45e9ac49..1c7b43b0 100644 --- a/src/task_monitor/tasks/delete_identities.rs +++ b/src/task_monitor/tasks/delete_identities.rs @@ -7,10 +7,13 @@ use tokio::sync::{Mutex, Notify}; use tracing::info; use crate::app::App; -use crate::database::types::DeletionEntry; +use crate::database::types::{BatchType, Commitments, DeletionEntry, LeafIndexes}; use crate::database::DatabaseExt; -use crate::identity_tree::Hash; +use crate::identity_tree::{Hash, TreeVersionReadOps}; +// todo(piotrh): ensure deletes runs from time to time +// todo(piotrh): ensure things are batched properly to save $$$ when executed +// on, add check timeour chain pub async fn delete_identities( app: Arc, pending_insertions_mutex: Arc>, @@ -47,7 +50,15 @@ pub async fn delete_identities( // Delete the commitments at the target leaf indices in the latest tree, // generating the proof for each update - let data = app.tree_state()?.latest_tree().delete_many(&leaf_indices); + let data = app + .tree_state()? + .latest_tree() + .delete_many_as_derived(&leaf_indices); + let prev_root = app.tree_state()?.latest_tree().get_root(); + let next_root = data + .last() + .map(|(root, ..)| root.clone()) + .expect("should be created at least one"); assert_eq!( data.len(), @@ -55,16 +66,30 @@ pub async fn delete_identities( "Length mismatch when appending identities to tree" ); + let mut tx = app.database.pool.begin().await?; + // Insert the new items into pending identities - let items = data.into_iter().zip(leaf_indices); - for ((root, _proof), leaf_index) in items { - app.database - .insert_pending_identity(leaf_index, &Hash::ZERO, &root) + let items: Vec<_> = data.into_iter().zip(leaf_indices).collect(); + for ((root, _proof), leaf_index) in items.iter() { + tx.insert_pending_identity(*leaf_index, &Hash::ZERO, &root) .await?; } // Remove the previous commitments from the deletions table - app.database.remove_deletions(&previous_commitments).await?; + tx.remove_deletions(&Commitments(previous_commitments.clone())) + .await?; + + tx.insert_new_batch( + &next_root, + &prev_root, + BatchType::Deletion, + &Commitments(previous_commitments), + &LeafIndexes(items.iter().map(|((..), leaf_index)| *leaf_index).collect()), + ) + .await?; + + tx.commit().await?; + wake_up_notify.notify_one(); } else { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; diff --git a/src/task_monitor/tasks/finalize_identities.rs b/src/task_monitor/tasks/finalize_identities.rs index 50b34f1b..0e4ce907 100644 --- a/src/task_monitor/tasks/finalize_identities.rs +++ b/src/task_monitor/tasks/finalize_identities.rs @@ -14,8 +14,11 @@ use crate::app::App; use crate::contracts::abi::{BridgedWorldId, RootAddedFilter, TreeChangeKind, TreeChangedFilter}; use crate::contracts::scanner::BlockScanner; use crate::contracts::IdentityManager; +use crate::database::types::Commitments; use crate::database::{Database, DatabaseExt as _}; -use crate::identity_tree::{Canonical, Intermediate, TreeVersion, TreeWithNextVersion}; +use crate::identity_tree::{ + Canonical, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, +}; use crate::task_monitor::TaskMonitor; pub async fn finalize_roots(app: Arc) -> anyhow::Result<()> { @@ -259,10 +262,6 @@ async fn update_eligible_recoveries( .context("Could not fetch deletion indices from tx")?; let commitments = processed_tree.commitments_by_indices(commitments.iter().copied()); - let commitments: Vec = commitments - .into_iter() - .map(std::convert::Into::into) - .collect(); // Fetch the root history expiry time on chain let root_history_expiry = identity_manager.root_history_expiry().await?; @@ -282,7 +281,7 @@ async fn update_eligible_recoveries( // Check if any deleted commitments correspond with entries in the // recoveries table and insert the new commitment into the unprocessed // identities table with the proper eligibility timestamp - let deleted_recoveries = tx.delete_recoveries(commitments).await?; + let deleted_recoveries = tx.delete_recoveries(&Commitments(commitments)).await?; // For each deletion, if there is a corresponding recovery, insert a new // identity with the specified eligibility timestamp diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index 94ff0fb3..e7b0e66a 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -1,46 +1,101 @@ use std::sync::Arc; use std::time::Duration; +use anyhow::Error; +use sqlx::{Postgres, Transaction}; use tokio::sync::{Mutex, Notify}; use tokio::time::sleep; use tracing::instrument; use crate::app::App; -use crate::database::types::UnprocessedCommitment; +use crate::database::types::{BatchType, Commitments, LeafIndexes, UnprocessedCommitment}; use crate::database::{Database, DatabaseExt}; use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; +// todo(piotrh): ensure things are batched properly to save $$$ when executed +// on, add check timeour chain pub async fn insert_identities( app: Arc, pending_insertions_mutex: Arc>, wake_up_notify: Arc, ) -> anyhow::Result<()> { + ensure_batch_chain_initialized(&app).await?; + + let batch_size = app.identity_manager.max_insertion_batch_size().await; + loop { - // get commits from database - let unprocessed = app - .database - .get_eligible_unprocessed_commitments(UnprocessedStatus::New) - .await?; + let mut tx = app.database.pool.begin().await?; + + // Multiple instances are trying to get the batch. Putting data into tree is + // possible only when tree is synced with database, because database is + // used to sync between instances. This awaiting is just to minimize + // failed tasks due to being out of sync with database. + if !is_synced(&app, &mut tx).await? { + // todo(piotrh): we may trigger sync (process_identities) here + return Err(Error::msg("Not synced with db.")); + } + + let unprocessed = get_identities_batch(&mut tx, batch_size).await?; if unprocessed.is_empty() { sleep(Duration::from_secs(5)).await; continue; } insert_identities_batch( - &app.database, + &mut tx, app.tree_state()?.latest_tree(), unprocessed, &pending_insertions_mutex, ) .await?; + + tx.commit().await?; + // Notify the identity processing task, that there are new identities wake_up_notify.notify_one(); } } +async fn ensure_batch_chain_initialized(app: &Arc) -> anyhow::Result<()> { + let batch_head = app.database.get_batch_head().await?; + if batch_head.is_none() { + app.database + .insert_new_batch_head( + &app.tree_state()?.latest_tree().get_root(), + BatchType::Insertion, + &Commitments(vec![]), + &LeafIndexes(vec![]), + ) + .await?; + } + Ok(()) +} + +async fn is_synced(app: &Arc, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result { + let next_db_leaf = tx.get_next_leaf_index().await?; + let next_leaf = app.tree_state()?.latest_tree().next_leaf(); + + assert!( + next_leaf <= next_db_leaf, + "Database and tree are out of sync, tree is ahead of database. Next leaf index in tree \ + is: {next_leaf}, in database: {next_db_leaf}." + ); + + Ok(next_leaf == next_db_leaf) +} + +async fn get_identities_batch( + tx: &mut Transaction<'_, Postgres>, + batch_size: usize, +) -> anyhow::Result> { + Ok(tx + .get_eligible_unprocessed_commitments(UnprocessedStatus::New, batch_size) + .await?) +} + #[instrument(level = "info", skip_all)] async fn insert_identities_batch( - database: &Database, + tx: &mut Transaction<'_, Postgres>, latest_tree: &TreeVersion, identities: Vec, pending_insertions_mutex: &Mutex<()>, @@ -48,15 +103,13 @@ async fn insert_identities_batch( // Filter out any identities that are already in the `identities` table let mut filtered_identities = vec![]; for identity in identities { - if database + if tx .get_identity_leaf_index(&identity.commitment) .await? .is_some() { tracing::warn!(?identity.commitment, "Duplicate identity"); - database - .remove_unprocessed_identity(&identity.commitment) - .await?; + tx.remove_unprocessed_identity(&identity.commitment).await?; } else { filtered_identities.push(identity.commitment); } @@ -64,16 +117,13 @@ async fn insert_identities_batch( let _guard = pending_insertions_mutex.lock().await; - let next_db_index = database.get_next_leaf_index().await?; - let next_leaf = latest_tree.next_leaf(); - - assert_eq!( - next_leaf, next_db_index, - "Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \ - {next_db_index}" - ); + let prev_root = latest_tree.get_root(); - let data = latest_tree.append_many(&filtered_identities); + let (data, _) = latest_tree.append_many_as_derived(&filtered_identities); + let next_root = data + .last() + .map(|(root, ..)| root.clone()) + .expect("should be created at least one"); assert_eq!( data.len(), @@ -81,15 +131,31 @@ async fn insert_identities_batch( "Length mismatch when appending identities to tree" ); - let items = data.into_iter().zip(filtered_identities); + let items: Vec<_> = data.into_iter().zip(filtered_identities.clone()).collect(); - for ((root, _proof, leaf_index), identity) in items { - database - .insert_pending_identity(leaf_index, &identity, &root) + for ((root, _proof, leaf_index), identity) in items.iter() { + tx.insert_pending_identity(*leaf_index, identity, root) .await?; - database.remove_unprocessed_identity(&identity).await?; + tx.remove_unprocessed_identity(identity).await?; } + tx.insert_new_batch( + &next_root, + &prev_root, + BatchType::Insertion, + &Commitments(items.iter().map(|(_, commitment)| *commitment).collect()), + &LeafIndexes( + items + .iter() + .map(|((_, _, leaf_index), _)| *leaf_index) + .collect(), + ), + ) + .await?; + + // todo(piotrh): ensure if we can or not do it here + // _ = latest_tree.append_many(&filtered_identities); + Ok(()) } diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index 6c5c8bcb..2d4383f3 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -12,15 +12,16 @@ use tracing::instrument; use crate::app::App; use crate::contracts::IdentityManager; +use crate::database::types::{BatchEntry, BatchType}; use crate::database::DatabaseExt as _; use crate::ethereum::write::TransactionId; use crate::identity_tree::{ - AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, + AppliedTreeUpdate, Hash, Intermediate, Latest, TreeVersion, TreeVersionReadOps, + TreeWithNextVersion, }; use crate::prover::identity::Identity; use crate::prover::Prover; use crate::task_monitor::TaskMonitor; -use crate::utils::batch_type::BatchType; use crate::utils::index_packing::pack_indices; /// The number of seconds either side of the timer tick to treat as enough to @@ -68,187 +69,118 @@ pub async fn process_identities( }, } - let Some(batch_type) = determine_batch_type(app.tree_state()?.batching_tree()) else { + let current_root = app.tree_state()?.latest_tree().get_root(); + let next_batch = app.database.get_next_batch(¤t_root).await?; + let Some(next_batch) = next_batch else { + if !app.database.is_root_in_batch_chain(¤t_root).await? { + // todo(piotrh) + panic!( + "Current root of latest tree cannot be find in database in batches chain. It \ + should never happen." + ); + } + // todo(piotrh): check if proper way to handle continue; }; - let batch_size = if batch_type.is_deletion() { - app.identity_manager.max_deletion_batch_size().await - } else { - app.identity_manager.max_insertion_batch_size().await - }; - - let updates = app - .tree_state()? - .batching_tree() - .peek_next_updates(batch_size); - - // If the batch is a deletion, process immediately without resetting the timer - if batch_type.is_deletion() { - commit_identities( - &app.identity_manager, - app.tree_state()?.batching_tree(), - &monitored_txs_sender, - &updates, - ) + let tx = app + .database + .get_transaction_for_batch(&next_batch.next_root) .await?; - } else { - let current_time = Utc::now(); - let batch_insertion_timeout = - chrono::Duration::from_std(app.config.app.batch_insertion_timeout)?; - - let timeout_batch_time = last_batch_time - + batch_insertion_timeout - + chrono::Duration::seconds(DEBOUNCE_THRESHOLD_SECS); - - let batch_time_elapsed = current_time >= timeout_batch_time; - - // If the batch size is full or if the insertion time has elapsed - // process the batch - if updates.len() >= batch_size || batch_time_elapsed { - commit_identities( - &app.identity_manager, - app.tree_state()?.batching_tree(), - &monitored_txs_sender, - &updates, - ) - .await?; - - // We've inserted the identities, so we want to ensure that - // we don't trigger again until either we get a full batch - // or the timer ticks. - timer.reset(); - last_batch_time = Utc::now(); - app.database - .update_latest_insertion_timestamp(last_batch_time) - .await?; - } else { - // Check if the next batch after the current insertion batch is - // deletion. The only time that deletions are - // inserted is when there is a full deletion batch or the - // deletion time interval has elapsed. - // In this case, we should immediately process the batch. - let next_batch_is_deletion = if let Some(update) = app - .tree_state()? - .batching_tree() - .peek_next_updates(batch_size + 1) - .last() - { - update.update.element == Hash::ZERO - } else { - false - }; - - // If the next batch is deletion, process the current insertion batch - if next_batch_is_deletion { - commit_identities( - &app.identity_manager, - app.tree_state()?.batching_tree(), - &monitored_txs_sender, - &updates, - ) - .await?; - } else { - // If there are not enough identities to fill the batch, the time interval has - // not elapsed and the next batch is not deletion, wait for more identities - tracing::trace!( - "Pending identities ({}) is less than batch size ({}). Waiting.", - updates.len(), - batch_size - ); - continue; - } + if tx.is_some() { + // todo(piotrh): should be run here? + update_tree(app.tree_state()?.latest_tree(), &next_batch)?; + + // todo(piotrh): check if proper way to handle + continue; + } + + let should_continue = match next_batch.batch_type { + BatchType::Insertion => { + process_insertion_batch(&app, &next_batch, &last_batch_time, &monitored_txs_sender) + .await? + } + BatchType::Deletion => { + process_deletion_batch(&app, &next_batch, &last_batch_time, &monitored_txs_sender) + .await? } + }; + + if should_continue { + continue; } + timer.reset(); + last_batch_time = Utc::now(); + app.database + .update_latest_insertion_timestamp(last_batch_time) + .await?; + // We want to check if there's a full batch available immediately wake_up_notify.notify_one(); } } -async fn commit_identities( - identity_manager: &IdentityManager, - batching_tree: &TreeVersion, - monitored_txs_sender: &mpsc::Sender, - updates: &[AppliedTreeUpdate], -) -> anyhow::Result<()> { - // If the update is an insertion - let tx_id = if updates - .first() - .context("Updates should be > 1")? - .update - .element - != Hash::ZERO - { - let prover = identity_manager - .get_suitable_insertion_prover(updates.len()) - .await?; - - tracing::info!( - num_updates = updates.len(), - batch_size = prover.batch_size(), - "Insertion batch", - ); - - insert_identities(identity_manager, batching_tree, updates, &prover).await? - } else { - let prover = identity_manager - .get_suitable_deletion_prover(updates.len()) - .await?; - - tracing::info!( - num_updates = updates.len(), - batch_size = prover.batch_size(), - "Deletion batch" - ); +async fn process_insertion_batch( + app: &Arc, + next_batch: &BatchEntry, + last_batch_time: &DateTime, + monitored_txs_sender: &Arc>, +) -> anyhow::Result { + let latest_tree = app.tree_state()?.latest_tree(); + + let (updates, updated_tree) = latest_tree.append_many_as_derived(&next_batch.commitments.0); + let next_root: Hash = updated_tree.root(); + if next_root != next_batch.next_root { + // todo(piotrh): implement + panic!("[zzz]"); + } - delete_identities(identity_manager, batching_tree, updates, &prover).await? - }; + let batch_size = app.identity_manager.max_insertion_batch_size().await; + let current_time = Utc::now(); + let batch_insertion_timeout = + chrono::Duration::from_std(app.config.app.batch_insertion_timeout)?; - if let Some(tx_id) = tx_id { - monitored_txs_sender.send(tx_id).await?; - } + let timeout_batch_time = *last_batch_time + + batch_insertion_timeout + + chrono::Duration::seconds(DEBOUNCE_THRESHOLD_SECS); - Ok(()) -} + let prover = app + .identity_manager + .get_suitable_insertion_prover(updates.len()) + .await?; -#[instrument(level = "info", skip_all)] -pub async fn insert_identities( - identity_manager: &IdentityManager, - batching_tree: &TreeVersion, - updates: &[AppliedTreeUpdate], - prover: &Prover, -) -> anyhow::Result> { - assert_updates_are_consecutive(updates); - - let start_index = updates[0].update.leaf_index; - let pre_root: U256 = batching_tree.get_root().into(); - let mut commitments: Vec = updates - .iter() - .map(|update| update.update.element.into()) - .collect(); + tracing::info!( + num_updates = updates.len(), + batch_size = prover.batch_size(), + "Insertion batch", + ); - let latest_tree_from_updates = updates - .last() - .expect("Updates is non empty.") - .result - .clone(); + assert_updates_are_consecutive(&next_batch); - // Next get merkle proofs for each update - note the proofs are acquired from - // intermediate versions of the tree - let mut merkle_proofs: Vec<_> = updates + // let start_index = updates[0].update.leaf_index; + let start_index = next_batch + .leaf_indexes + .0 + .first() + .expect("checked earlier") + .clone(); // todo(piotrh): error message + let mut commitments: Vec = next_batch + .commitments + .0 .iter() - .map(|update| update.result.proof(update.update.leaf_index)) + .map(|v| (*v).into()) .collect(); + let mut merkle_proofs: Vec<_> = updates.iter().map(|(_, proof, _)| proof.clone()).collect(); // Grab some variables for sizes to make querying easier. - let commitment_count = updates.len(); + let commitment_count = commitments.len(); // If these aren't equal then something has gone terribly wrong and is a // programmer bug, so we abort. assert_eq!( commitment_count, - merkle_proofs.len(), + updates.len(), "Number of identities does not match the number of merkle proofs." ); @@ -259,17 +191,14 @@ pub async fn insert_identities( // subsequent zero identities and their associated merkle proofs if the batch is // too small. if commitment_count != batch_size { - let start_index = updates - .last() - .expect("Already confirmed to exist.") - .update - .leaf_index + let start_index = next_batch.leaf_indexes.0.last() + .expect("Already confirmed to exist.") // todo(piotrh), check message + 1; let padding = batch_size - commitment_count; commitments.append(&mut vec![U256::zero(); padding]); for i in start_index..(start_index + padding) { - let proof = latest_tree_from_updates.proof(i); + let proof = updated_tree.proof(i); merkle_proofs.push(proof); } } @@ -287,35 +216,38 @@ pub async fn insert_identities( // With the updates applied we can grab the value of the tree's new root and // build our identities for sending to the identity manager. - let post_root: U256 = latest_tree_from_updates.root().into(); + let prev_root: U256 = next_batch.prev_root.unwrap().into(); // todo(piotrh) unwrap remove + let next_root: U256 = next_batch.next_root.into(); // todo(piotrh) unwrap remove let identity_commitments = zip_commitments_and_proofs(commitments, merkle_proofs); - identity_manager.validate_merkle_proofs(&identity_commitments)?; + app.identity_manager + .validate_merkle_proofs(&identity_commitments)?; // We prepare the proof before reserving a slot in the pending identities let proof = IdentityManager::prepare_insertion_proof( - prover, + &prover, start_index, - pre_root, + prev_root, &identity_commitments, - post_root, + next_root, ) .await?; tracing::info!( start_index, - ?pre_root, - ?post_root, + ?prev_root, + ?next_root, "Submitting insertion batch" ); // With all the data prepared we can submit the identities to the on-chain // identity manager and wait for that transaction to be mined. - let transaction_id = identity_manager + let transaction_id = app + .identity_manager .register_identities( start_index, - pre_root, - post_root, + prev_root, + next_root, identity_commitments, proof, ) @@ -327,83 +259,79 @@ pub async fn insert_identities( tracing::info!( start_index, - ?pre_root, - ?post_root, + ?prev_root, + ?next_root, ?transaction_id, "Insertion batch submitted" ); - // Update the batching tree only after submitting the identities to the chain - batching_tree.apply_updates_up_to(post_root.into()); + tracing::info!(start_index, ?prev_root, ?next_root, "Tree updated"); - tracing::info!(start_index, ?pre_root, ?post_root, "Tree updated"); + app.database + .insert_new_transaction(&transaction_id.0, &next_batch.next_root) + .await?; - TaskMonitor::log_batch_size(updates.len()); + // todo(piotrh): check + // do the real update finally when transaction is created + // after transaction being saved we are save even to not do it, but let's do for + // optimization + latest_tree.append_many(&next_batch.commitments.0); - Ok(Some(transaction_id)) -} + monitored_txs_sender.send(transaction_id).await?; -fn assert_updates_are_consecutive(updates: &[AppliedTreeUpdate]) { - for updates in updates.windows(2) { - let first = &updates[0]; - let second = &updates[1]; + TaskMonitor::log_batch_size(updates.len()); - if first.update.leaf_index + 1 != second.update.leaf_index { - let leaf_indexes = updates - .iter() - .map(|update| update.update.leaf_index) - .collect::>(); - let commitments = updates - .iter() - .map(|update| update.update.element) - .collect::>(); + Ok(false) +} - panic!( - "Identities are not consecutive leaves in the tree (leaf_indexes = {:?}, \ - commitments = {:?})", - leaf_indexes, commitments - ); - } +async fn process_deletion_batch( + app: &Arc, + next_batch: &BatchEntry, + last_batch_time: &DateTime, + monitored_txs_sender: &Arc>, +) -> anyhow::Result { + let latest_tree = app.tree_state()?.latest_tree(); + + let updates = latest_tree.delete_many(&next_batch.leaf_indexes.0); + if latest_tree.get_root() != next_batch.next_root { + // todo(piotrh): implement + panic!("[zzz]"); } -} -pub async fn delete_identities( - identity_manager: &IdentityManager, - batching_tree: &TreeVersion, - updates: &[AppliedTreeUpdate], - prover: &Prover, -) -> anyhow::Result> { - // Grab the initial conditions before the updates are applied to the tree. - let pre_root: U256 = batching_tree.get_root().into(); + let batch_insertion_timeout = + chrono::Duration::from_std(app.config.app.batch_insertion_timeout)?; - let mut deletion_indices = updates - .iter() - .map(|f| f.update.leaf_index as u32) - .collect::>(); + let prover = app + .identity_manager + .get_suitable_deletion_prover(updates.len()) + .await?; - let commitments = - batching_tree.commitments_by_indices(deletion_indices.iter().map(|x| *x as usize)); - let mut commitments: Vec = commitments.into_iter().map(U256::from).collect(); + tracing::info!( + num_updates = updates.len(), + batch_size = prover.batch_size(), + "Deletion batch", + ); - let latest_tree_from_updates = updates - .last() - .expect("Updates is non empty.") - .result - .clone(); + // Grab the initial conditions before the updates are applied to the tree. + let mut deletion_indices: Vec<_> = next_batch + .leaf_indexes + .0 + .iter() + .map(|&v| v as u32) + .collect(); + let mut commitments: Vec = next_batch + .commitments + .0 + .iter() + .map(|v| (*v).into()) + .collect(); // Next get merkle proofs for each update - note the proofs are acquired from // intermediate versions of the tree - let mut merkle_proofs: Vec<_> = updates - .iter() - .map(|update_with_tree| { - update_with_tree - .result - .proof(update_with_tree.update.leaf_index) - }) - .collect(); + let mut merkle_proofs: Vec<_> = updates.iter().map(|(_, proof)| proof.clone()).collect(); // Grab some variables for sizes to make querying easier. - let commitment_count = updates.len(); + let commitment_count = commitments.len(); // If these aren't equal then something has gone terribly wrong and is a // programmer bug, so we abort. @@ -419,17 +347,14 @@ pub async fn delete_identities( // ensure that our batches match that size. We do this by padding deletion // indices with tree.depth() ^ 2. The deletion prover will skip the proof for // any deletion with an index greater than the max tree depth - let pad_index = 2_u32.pow(latest_tree_from_updates.depth() as u32); + let pad_index = 2_u32.pow(latest_tree.get_depth() as u32); if commitment_count != batch_size { let padding = batch_size - commitment_count; commitments.extend(vec![U256::zero(); padding]); deletion_indices.extend(vec![pad_index; padding]); - let zeroed_proof = Proof(vec![ - Branch::Left(Uint::ZERO); - latest_tree_from_updates.depth() - ]); + let zeroed_proof = Proof(vec![Branch::Left(Uint::ZERO); latest_tree.get_depth()]); merkle_proofs.extend(vec![zeroed_proof; padding]); } @@ -442,29 +367,32 @@ pub async fn delete_identities( // With the updates applied we can grab the value of the tree's new root and // build our identities for sending to the identity manager. - let post_root: U256 = latest_tree_from_updates.root().into(); + let prev_root: U256 = next_batch.prev_root.unwrap().into(); // todo(piotrh) unwrap remove + let next_root: U256 = next_batch.next_root.into(); // todo(piotrh) unwrap remove let identity_commitments = zip_commitments_and_proofs(commitments, merkle_proofs); - identity_manager.validate_merkle_proofs(&identity_commitments)?; + app.identity_manager + .validate_merkle_proofs(&identity_commitments)?; // We prepare the proof before reserving a slot in the pending identities let proof = IdentityManager::prepare_deletion_proof( - prover, - pre_root, + &prover, + prev_root, deletion_indices.clone(), identity_commitments, - post_root, + next_root, ) .await?; let packed_deletion_indices = pack_indices(&deletion_indices); - tracing::info!(?pre_root, ?post_root, "Submitting deletion batch"); + tracing::info!(?prev_root, ?next_root, "Submitting deletion batch"); // With all the data prepared we can submit the identities to the on-chain // identity manager and wait for that transaction to be mined. - let transaction_id = identity_manager - .delete_identities(proof, packed_deletion_indices, pre_root, post_root) + let transaction_id = app + .identity_manager + .delete_identities(proof, packed_deletion_indices, prev_root, next_root) .await .map_err(|e| { tracing::error!(?e, "Failed to insert identity to contract."); @@ -472,20 +400,50 @@ pub async fn delete_identities( })?; tracing::info!( - ?pre_root, - ?post_root, + ?prev_root, + ?next_root, ?transaction_id, "Deletion batch submitted" ); - // Update the batching tree only after submitting the identities to the chain - batching_tree.apply_updates_up_to(post_root.into()); + app.database + .insert_new_transaction(&transaction_id.0, &next_batch.next_root) + .await?; - tracing::info!(?pre_root, ?post_root, "Tree updated"); + monitored_txs_sender.send(transaction_id).await?; + + tracing::info!(?prev_root, ?next_root, "Tree updated"); TaskMonitor::log_batch_size(updates.len()); - Ok(Some(transaction_id)) + Ok(false) +} + +fn update_tree(latest_tree: &TreeVersion, next_batch: &BatchEntry) -> anyhow::Result<()> { + if next_batch.batch_type == BatchType::Insertion { + latest_tree.append_many(&next_batch.commitments.0); + } else if next_batch.batch_type == BatchType::Deletion { + latest_tree.delete_many(&next_batch.leaf_indexes.0); + } + + if latest_tree.get_root() != next_batch.next_root { + // todo(piotrh): implement + panic!("[zzz]"); + } + + Ok(()) +} + +fn assert_updates_are_consecutive(next_batch: &BatchEntry) { + for window in next_batch.leaf_indexes.0.windows(2) { + if window[0] + 1 != window[1] { + panic!( + "Identities are not consecutive leaves in the tree (leaf_indexes = {:?}, \ + commitments = {:?})", + next_batch.leaf_indexes.0, next_batch.commitments.0 + ); + } + } } fn zip_commitments_and_proofs( @@ -508,18 +466,3 @@ fn zip_commitments_and_proofs( }) .collect() } - -fn determine_batch_type(tree: &TreeVersion) -> Option { - let next_update = tree.peek_next_updates(1); - if next_update.is_empty() { - return None; - } - - let batch_type = if next_update[0].update.element == Hash::ZERO { - BatchType::Deletion - } else { - BatchType::Insertion - }; - - Some(batch_type) -} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index f3c7a715..89e20c25 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -747,7 +747,7 @@ pub async fn spawn_deps<'a, 'b, 'c>( )) } -async fn spawn_db<'a>(docker: &'a Cli) -> anyhow::Result> { +async fn spawn_db(docker: &Cli) -> anyhow::Result { let db_container = postgres_docker_utils::setup(docker).await.unwrap(); Ok(db_container) diff --git a/tests/insert_identity_and_proofs.rs b/tests/insert_identity_and_proofs.rs index f3108db2..707a9bb0 100644 --- a/tests/insert_identity_and_proofs.rs +++ b/tests/insert_identity_and_proofs.rs @@ -1,7 +1,7 @@ -mod common; - use common::prelude::*; +mod common; + const IDLE_TIME: u64 = 7; #[tokio::test] @@ -50,6 +50,15 @@ async fn insert_identity_and_proofs() -> anyhow::Result<()> { let (_, app_handle, local_addr) = spawn_app(config.clone()) .await .expect("Failed to spawn app."); + let (_, app2_handle, local_addr2) = spawn_app(config.clone()) + .await + .expect("Failed to spawn app2."); + let (_, app3_handle, local_addr3) = spawn_app(config.clone()) + .await + .expect("Failed to spawn app2."); + let (_, app4_handle, local_addr4) = spawn_app(config.clone()) + .await + .expect("Failed to spawn app2."); let test_identities = generate_test_identities(batch_size * 3); let identities_ref: Vec = test_identities @@ -58,8 +67,13 @@ async fn insert_identity_and_proofs() -> anyhow::Result<()> { .collect(); let uri = "http://".to_owned() + &local_addr.to_string(); + let uri2 = "http://".to_owned() + &local_addr2.to_string(); + let uri3 = "http://".to_owned() + &local_addr3.to_string(); + let uri4 = "http://".to_owned() + &local_addr4.to_string(); let client = Client::new(); + tokio::time::sleep(Duration::from_secs(20)).await; + // Check that we can get inclusion proofs for things that already exist in the // database and on chain. test_inclusion_proof( @@ -84,13 +98,13 @@ async fn insert_identity_and_proofs() -> anyhow::Result<()> { // Insert enough identities to trigger an batch to be sent to the blockchain // based on the current batch size of 3. test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 0).await; - test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 1).await; - test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 2).await; + test_insert_identity(&uri2, &client, &mut ref_tree, &identities_ref, 1).await; + test_insert_identity(&uri3, &client, &mut ref_tree, &identities_ref, 2).await; tokio::time::sleep(Duration::from_secs(IDLE_TIME)).await; // Check that we can get their inclusion proofs back. test_inclusion_proof( - &uri, + &uri4, &client, 0, &ref_tree, @@ -100,22 +114,22 @@ async fn insert_identity_and_proofs() -> anyhow::Result<()> { ) .await; test_inclusion_proof( - &uri, + &uri4, &client, 1, &ref_tree, &Hash::from_str_radix(&test_identities[1], 16) - .expect("Failed to parse Hash from test leaf 0"), + .expect("Failed to parse Hash from test leaf 1"), false, ) .await; test_inclusion_proof( - &uri, + &uri4, &client, 2, &ref_tree, &Hash::from_str_radix(&test_identities[2], 16) - .expect("Failed to parse Hash from test leaf 0"), + .expect("Failed to parse Hash from test leaf 2"), false, ) .await; @@ -135,7 +149,7 @@ async fn insert_identity_and_proofs() -> anyhow::Result<()> { 3, &ref_tree, &Hash::from_str_radix(&test_identities[3], 16) - .expect("Failed to parse Hash from test leaf 0"), + .expect("Failed to parse Hash from test leaf 3"), false, ) .await; @@ -145,7 +159,7 @@ async fn insert_identity_and_proofs() -> anyhow::Result<()> { 4, &ref_tree, &Hash::from_str_radix(&test_identities[4], 16) - .expect("Failed to parse Hash from test leaf 0"), + .expect("Failed to parse Hash from test leaf 4"), false, ) .await; @@ -155,6 +169,9 @@ async fn insert_identity_and_proofs() -> anyhow::Result<()> { info!("Stopping the app for testing purposes"); shutdown(); app_handle.await.unwrap(); + app2_handle.await.unwrap(); + app3_handle.await.unwrap(); + app4_handle.await.unwrap(); reset_shutdown(); // Test loading the state from a file when the on-chain contract has the state. @@ -181,7 +198,7 @@ async fn insert_identity_and_proofs() -> anyhow::Result<()> { 4, &ref_tree, &Hash::from_str_radix(&test_identities[4], 16) - .expect("Failed to parse Hash from test leaf 0"), + .expect("Failed to parse Hash from test leaf 4"), false, ) .await; @@ -218,7 +235,7 @@ async fn insert_identity_and_proofs() -> anyhow::Result<()> { 4, &ref_tree, &Hash::from_str_radix(&test_identities[4], 16) - .expect("Failed to parse Hash from test leaf 0"), + .expect("Failed to parse Hash from test leaf 4"), false, ) .await; diff --git a/tests/unavailable_prover.rs b/tests/unavailable_prover.rs index 31b8dee9..b0f5e075 100644 --- a/tests/unavailable_prover.rs +++ b/tests/unavailable_prover.rs @@ -66,12 +66,12 @@ async fn unavailable_prover() -> anyhow::Result<()> { // Wait for a while - this should let the processing thread panic or fail at // least once - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(10)).await; // Make prover available again prover_mock.set_availability(true).await; // and wait until the processing thread spins up again - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(10)).await; info!("Prover has been reenabled");