Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
piohei committed Apr 29, 2024
1 parent 88bbd48 commit b1bb920
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 25 deletions.
13 changes: 11 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM debian:12 as build-env
ARG BUILD_DEV

WORKDIR /src

Expand All @@ -20,11 +21,16 @@ ENV PATH="/root/.cargo/bin:${PATH}"
ENV RUSTUP_HOME="/root/.rustup"
ENV CARGO_HOME="/root/.cargo"

ENV CARGO_ARGS=${BUILD_DEV:+}
ENV CARGO_ARGS=${CARGO_ARGS:---release}

RUN echo "CARGO_ARGS: ${CARGO_ARGS}"

# Install the toolchain
RUN rustup component add cargo

# Build the sequencer
RUN cargo build --release
RUN cargo build ${CARGO_ARGS}

# cc variant because we need libgcc and others
FROM gcr.io/distroless/cc-debian12:nonroot
Expand All @@ -36,7 +42,10 @@ LABEL prometheus.io/scrape="true"
LABEL prometheus.io/port="9998"
LABEL prometheus.io/path="/metrics"

ENV BIN_PATH=${BUILD_DEV:+/src/target/debug/signup-sequencer}
ENV BIN_PATH=${BIN_PATH:-/src/target/release/signup-sequencer}

# Copy the sequencer binary
COPY --from=build-env --chown=0:10001 --chmod=454 /src/target/release/signup-sequencer /bin/signup-sequencer
COPY --from=build-env --chown=0:10001 --chmod=454 ${BIN_PATH} /bin/signup-sequencer

ENTRYPOINT [ "/bin/signup-sequencer" ]
206 changes: 206 additions & 0 deletions e2e_tests/compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
version: "3"
services:
chain:
container_name: chain
image: ghcr.io/foundry-rs/foundry
platform: linux/amd64
ports:
- "8545:8545"
command: [ "anvil --host 0.0.0.0 --chain-id 31337 --block-time 30 --base-fee 0 --gas-limit 0 --gas-price 0 --fork-url https://eth-sepolia.g.alchemy.com/v2/Hkj3vTy6ee49NbI4Imhe6r5mRM1vkR10@5091094" ]
tx-sitter-db:
container_name: tx-sitter-db
image: postgres:latest
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=tx-sitter
ports:
- "5460:5432"
volumes:
- tx_sitter_db_data:/var/lib/postgresql/data
sequencer-db:
container_name: sequencer-db
image: postgres:latest
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=sequencer
ports:
- "5461:5432"
volumes:
- sequencer_db_data:/var/lib/postgresql/data
tx-sitter:
container_name: tx-sitter
image: tx-sitter-monolith
depends_on:
- tx-sitter-db
- chain
restart: always
ports:
- "3000:3000"
environment:
- TX_SITTER__SERVICE__ESCALATION_INTERVAL=3s
- TX_SITTER__DATABASE__KIND=connection_string
- TX_SITTER__DATABASE__CONNECTION_STRING=postgres://postgres:postgres@tx-sitter-db:5432/tx-sitter?sslmode=disable
- TX_SITTER__KEYS__KIND=local
- TX_SITTER__SERVICE__PREDEFINED__NETWORK__CHAIN_ID=31337
- TX_SITTER__SERVICE__PREDEFINED__NETWORK__NAME=Anvil
- TX_SITTER__SERVICE__PREDEFINED__NETWORK__HTTP_RPC=http://chain:8545
- TX_SITTER__SERVICE__PREDEFINED__NETWORK__WS_RPC=ws://chain:8545
- TX_SITTER__SERVICE__PREDEFINED__RELAYER__ID=1b908a34-5dc1-4d2d-a146-5eb46e975830
- TX_SITTER__SERVICE__PREDEFINED__RELAYER__NAME=Relayer
- TX_SITTER__SERVICE__PREDEFINED__RELAYER__CHAIN_ID=31337
- TX_SITTER__SERVICE__PREDEFINED__RELAYER__KEY_ID=d10607662a85424f02a33fb1e6d095bd0ac7154396ff09762e41f82ff2233aaa
- TX_SITTER__SERVICE__PREDEFINED__RELAYER__API_KEY=G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH
- TX_SITTER__SERVER__HOST=0.0.0.0:3000
- TX_SITTER__SERVER__DISABLE_AUTH=true
- RUST_LOG=info
semaphore-insertion:
container_name: semaphore-insertion
image: semaphore-mtb
hostname: semaphore-insertion
restart: always
command: [ "start", "--keys-file", "/mtb/keys", "--prover-address", "0.0.0.0:3001", "--mode", "insertion" ]
volumes:
- ./keys/insertion_b10t30.ps:/mtb/keys
environment:
BATCH_TIMEOUT_SECONDS: 1
semaphore-deletion:
container_name: semaphore-deletion
image: semaphore-mtb
hostname: semaphore-deletion
restart: always
command: [ "start", "--keys-file", "/mtb/keys", "--prover-address", "0.0.0.0:3001", "--mode", "deletion" ]
volumes:
- ./keys/deletion_b10t30.ps:/mtb/keys
environment:
BATCH_DELETION_TIMEOUT_SECONDS: 1
signup-sequencer-0:
container_name: signup-sequencer-0
image: signup-sequencer
build:
context: ./../
args:
BUILD_DEV: 1
depends_on:
- sequencer-db
- chain
- semaphore-insertion
- semaphore-deletion
restart: always
ports:
- "9080:8080"
environment:
- SEQ__TREE__TREE_DEPTH=30
- SEQ__TREE__DENSE_TREE_PREFIX_DEPTH=22
- SEQ__TREE__TREE_GC_THRESHOLD=10000000
- SEQ__TREE__CACHE_FILE=./cache_file
- SEQ__SERVER__ADDRESS=0.0.0.0:8080
- SEQ__NETWORK__IDENTITY_MANAGER_ADDRESS=0x48483748eb0446A16cAE79141D0688e3F624Cb73
- SEQ__RELAYER__KIND=tx_sitter
- SEQ__RELAYER__TX_SITTER_URL=http://tx-sitter:3000/1/api/G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH
- SEQ__RELAYER__TX_SITTER_ADDRESS=0x1d7ffed610cc4cdC097ecDc835Ae5FEE93C9e3Da
- SEQ__RELAYER__TX_SITTER_GAS_LIMIT=2000000
- SEQ__PROVIDERS__PRIMARY_NETWORK_PROVIDER=http://chain:8545
- 'SEQ__APP__PROVERS_URLS=[{"url": "http://semaphore-insertion:3001", "prover_type": "insertion", "batch_size": 10,"timeout_s": 30}, {"url": "http://semaphore-deletion:3001", "prover_type": "deletion", "batch_size": 10,"timeout_s": 30}]'
- SEQ__DATABASE__DATABASE=postgres://postgres:postgres@sequencer-db:5432/sequencer?sslmode=disable
- SEQ__APP__BATCH_INSERTION_TIMEOUT=30s
- SEQ__APP__BATCH_DELETION_TIMEOUT_SECONDS=1s
signup-sequencer-1:
container_name: signup-sequencer-1
image: signup-sequencer
build:
context: ./../
args:
BUILD_DEV: 1
depends_on:
- sequencer-db
- chain
- semaphore-insertion
- semaphore-deletion
restart: always
ports:
- "9081:8080"
environment:
- SEQ__TREE__TREE_DEPTH=30
- SEQ__TREE__DENSE_TREE_PREFIX_DEPTH=22
- SEQ__TREE__TREE_GC_THRESHOLD=10000000
- SEQ__TREE__CACHE_FILE=./cache_file
- SEQ__SERVER__ADDRESS=0.0.0.0:8080
- SEQ__NETWORK__IDENTITY_MANAGER_ADDRESS=0x48483748eb0446A16cAE79141D0688e3F624Cb73
- SEQ__RELAYER__KIND=tx_sitter
- SEQ__RELAYER__TX_SITTER_URL=http://tx-sitter:3000/1/api/G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH
- SEQ__RELAYER__TX_SITTER_ADDRESS=0x1d7ffed610cc4cdC097ecDc835Ae5FEE93C9e3Da
- SEQ__RELAYER__TX_SITTER_GAS_LIMIT=2000000
- SEQ__PROVIDERS__PRIMARY_NETWORK_PROVIDER=http://chain:8545
- 'SEQ__APP__PROVERS_URLS=[{"url": "http://semaphore-insertion:3001", "prover_type": "insertion", "batch_size": 10,"timeout_s": 30}, {"url": "http://semaphore-deletion:3001", "prover_type": "deletion", "batch_size": 10,"timeout_s": 30}]'
- SEQ__DATABASE__DATABASE=postgres://postgres:postgres@sequencer-db:5432/sequencer?sslmode=disable
- SEQ__APP__BATCH_INSERTION_TIMEOUT=30s
- SEQ__APP__BATCH_DELETION_TIMEOUT_SECONDS=1s
signup-sequencer-2:
container_name: signup-sequencer-2
image: signup-sequencer
build:
context: ./../
args:
BUILD_DEV: 1
depends_on:
- sequencer-db
- chain
- semaphore-insertion
- semaphore-deletion
restart: always
ports:
- "9082:8080"
environment:
- SEQ__TREE__TREE_DEPTH=30
- SEQ__TREE__DENSE_TREE_PREFIX_DEPTH=22
- SEQ__TREE__TREE_GC_THRESHOLD=10000000
- SEQ__TREE__CACHE_FILE=./cache_file
- SEQ__SERVER__ADDRESS=0.0.0.0:8080
- SEQ__NETWORK__IDENTITY_MANAGER_ADDRESS=0x48483748eb0446A16cAE79141D0688e3F624Cb73
- SEQ__RELAYER__KIND=tx_sitter
- SEQ__RELAYER__TX_SITTER_URL=http://tx-sitter:3000/1/api/G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH
- SEQ__RELAYER__TX_SITTER_ADDRESS=0x1d7ffed610cc4cdC097ecDc835Ae5FEE93C9e3Da
- SEQ__RELAYER__TX_SITTER_GAS_LIMIT=2000000
- SEQ__PROVIDERS__PRIMARY_NETWORK_PROVIDER=http://chain:8545
- 'SEQ__APP__PROVERS_URLS=[{"url": "http://semaphore-insertion:3001", "prover_type": "insertion", "batch_size": 10,"timeout_s": 30}, {"url": "http://semaphore-deletion:3001", "prover_type": "deletion", "batch_size": 10,"timeout_s": 30}]'
- SEQ__DATABASE__DATABASE=postgres://postgres:postgres@sequencer-db:5432/sequencer?sslmode=disable
- SEQ__APP__BATCH_INSERTION_TIMEOUT=30s
- SEQ__APP__BATCH_DELETION_TIMEOUT_SECONDS=1s
signup-sequencer-3:
container_name: signup-sequencer-3
image: signup-sequencer
build:
context: ./../
args:
BUILD_DEV: 1
depends_on:
- sequencer-db
- chain
- semaphore-insertion
- semaphore-deletion
restart: always
ports:
- "9083:8080"
environment:
- SEQ__TREE__TREE_DEPTH=30
- SEQ__TREE__DENSE_TREE_PREFIX_DEPTH=22
- SEQ__TREE__TREE_GC_THRESHOLD=10000000
- SEQ__TREE__CACHE_FILE=./cache_file
- SEQ__SERVER__ADDRESS=0.0.0.0:8080
- SEQ__NETWORK__IDENTITY_MANAGER_ADDRESS=0x48483748eb0446A16cAE79141D0688e3F624Cb73
- SEQ__RELAYER__KIND=tx_sitter
- SEQ__RELAYER__TX_SITTER_URL=http://tx-sitter:3000/1/api/G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsnllwU9D3Z4D8gOt04M0QNH
- SEQ__RELAYER__TX_SITTER_ADDRESS=0x1d7ffed610cc4cdC097ecDc835Ae5FEE93C9e3Da
- SEQ__RELAYER__TX_SITTER_GAS_LIMIT=2000000
- SEQ__PROVIDERS__PRIMARY_NETWORK_PROVIDER=http://chain:8545
- 'SEQ__APP__PROVERS_URLS=[{"url": "http://semaphore-insertion:3001", "prover_type": "insertion", "batch_size": 10,"timeout_s": 30}, {"url": "http://semaphore-deletion:3001", "prover_type": "deletion", "batch_size": 10,"timeout_s": 30}]'
- SEQ__DATABASE__DATABASE=postgres://postgres:postgres@sequencer-db:5432/sequencer?sslmode=disable
- SEQ__APP__BATCH_INSERTION_TIMEOUT=30s
- SEQ__APP__BATCH_DELETION_TIMEOUT_SECONDS=1s
volumes:
tx_sitter_db_data:
driver: local
sequencer_db_data:
driver: local
10 changes: 10 additions & 0 deletions e2e_tests/create_identities.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/sh

NUMBER=${1:-1}
SLEEP=${2:-0}

for run in $(seq $NUMBER); do
echo "running";
curl -X POST -H "Content-Type: application/json" -d "{\"identityCommitment\":\"0x$(openssl rand -hex 16)\"}" localhost:9080/insertIdentity -vv;
sleep $SLEEP
done
65 changes: 42 additions & 23 deletions src/task_monitor/tasks/insert_identities.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Error;
use sqlx::{Postgres, Transaction};
use tokio::sync::{Mutex, Notify};
use tokio::time::sleep;
use tracing::instrument;
Expand All @@ -22,24 +24,33 @@ pub async fn insert_identities(
let batch_size = app.identity_manager.max_insertion_batch_size().await;

loop {
// get commits from database
let unprocessed = app
.database
.get_eligible_unprocessed_commitments(UnprocessedStatus::New, batch_size)
.await?;
let mut tx = app.database.pool.begin().await?;

// Multiple instances are trying to get the batch. Putting data into tree is
// possible only when tree is synced with database, because database is
// used to sync between instances. This awaiting is just to minimize
// failed tasks due to being out of sync with database.
if !is_synced(&app, &mut tx).await? {
// todo(piotrh): we may trigger sync (process_identities) here
return Err(Error::msg("Not synced with db."));
}

let unprocessed = get_identities_batch(&mut tx, batch_size).await?;
if unprocessed.is_empty() {
sleep(Duration::from_secs(5)).await;
continue;
}

insert_identities_batch(
&app.database,
&mut tx,
app.tree_state()?.latest_tree(),
unprocessed,
&pending_insertions_mutex,
)
.await?;

tx.commit().await?;

// Notify the identity processing task, that there are new identities
wake_up_notify.notify_one();
}
Expand All @@ -60,42 +71,54 @@ async fn ensure_batch_chain_initialized(app: &Arc<App>) -> anyhow::Result<()> {
Ok(())
}

async fn is_synced(app: &Arc<App>, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<bool> {
let next_db_leaf = tx.get_next_leaf_index().await?;
let next_leaf = app.tree_state()?.latest_tree().next_leaf();

assert!(
next_leaf <= next_db_leaf,
"Database and tree are out of sync, tree is ahead of database. Next leaf index in tree \
is: {next_leaf}, in database: {next_db_leaf}."
);

Ok(next_leaf == next_db_leaf)
}

async fn get_identities_batch(
tx: &mut Transaction<'_, Postgres>,
batch_size: usize,
) -> anyhow::Result<Vec<UnprocessedCommitment>> {
Ok(tx
.get_eligible_unprocessed_commitments(UnprocessedStatus::New, batch_size)
.await?)
}

#[instrument(level = "info", skip_all)]
async fn insert_identities_batch(
database: &Database,
tx: &mut Transaction<'_, Postgres>,
latest_tree: &TreeVersion<Latest>,
identities: Vec<UnprocessedCommitment>,
pending_insertions_mutex: &Mutex<()>,
) -> anyhow::Result<()> {
// Filter out any identities that are already in the `identities` table
let mut filtered_identities = vec![];
for identity in identities {
if database
if tx
.get_identity_leaf_index(&identity.commitment)
.await?
.is_some()
{
tracing::warn!(?identity.commitment, "Duplicate identity");
database
.remove_unprocessed_identity(&identity.commitment)
.await?;
tx.remove_unprocessed_identity(&identity.commitment).await?;
} else {
filtered_identities.push(identity.commitment);
}
}

let _guard = pending_insertions_mutex.lock().await;

let next_db_index = database.get_next_leaf_index().await?;
let next_leaf = latest_tree.next_leaf();
let prev_root = latest_tree.get_root();

assert_eq!(
next_leaf, next_db_index,
"Database and tree are out of sync. Next leaf index in tree is: {next_leaf}, in database: \
{next_db_index}"
);

let (data, _) = latest_tree.append_many_as_derived(&filtered_identities);
let next_root = data
.last()
Expand All @@ -110,8 +133,6 @@ async fn insert_identities_batch(

let items: Vec<_> = data.into_iter().zip(filtered_identities.clone()).collect();

let mut tx = database.pool.begin().await?;

for ((root, _proof, leaf_index), identity) in items.iter() {
tx.insert_pending_identity(*leaf_index, identity, root)
.await?;
Expand All @@ -133,8 +154,6 @@ async fn insert_identities_batch(
)
.await?;

tx.commit().await?;

// todo(piotrh): ensure if we can or not do it here
// _ = latest_tree.append_many(&filtered_identities);

Expand Down

0 comments on commit b1bb920

Please sign in to comment.