Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Preview] HA solution #765

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM debian:12 as build-env
FROM debian:12 AS build-env

WORKDIR /src

Expand Down
25 changes: 21 additions & 4 deletions crates/tx-sitter-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::fmt;

use anyhow::bail;
use data::{GetTxResponse, SendTxRequest, SendTxResponse, TxStatus};
use reqwest::Response;
use reqwest::{Response, StatusCode};
use tracing::instrument;

pub mod data;
Expand All @@ -9,6 +12,22 @@ pub struct TxSitterClient {
url: String,
}

#[derive(Debug)]
pub struct HttpError {
pub status: StatusCode,
pub body: String,
}

impl fmt::Display for HttpError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Response failed with status {} - {}",
self.status, self.body
)
}
}

impl TxSitterClient {
pub fn new(url: impl ToString) -> Self {
Self {
Expand Down Expand Up @@ -46,9 +65,7 @@ impl TxSitterClient {
let body = response.text().await?;

tracing::error!("Response failed with status {} - {}", status, body);
return Err(anyhow::anyhow!(
"Response failed with status {status} - {body}"
));
bail!(HttpError { body, status });
}

Ok(response)
Expand Down
33 changes: 17 additions & 16 deletions e2e_tests/docker-compose/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ services:
volumes:
- sequencer_db_data:/var/lib/postgresql/data
tx-sitter:
image: ghcr.io/worldcoin/tx-sitter-monolith:latest
#image: ghcr.io/worldcoin/tx-sitter-monolith:latest
image: tx-sitter-monolith
hostname: tx-sitter
depends_on:
- tx-sitter-db
Expand Down Expand Up @@ -140,21 +141,21 @@ services:
command: [ "/config.toml" ]
environment:
- RUST_LOG=debug
# signup-sequencer-1:
# <<: *signup-sequencer-def
# hostname: signup-sequencer-1
# ports:
# - ${SIGNUP_SEQUENCER_0_PORT:-9081}:8080
# signup-sequencer-2:
# <<: *signup-sequencer-def
# hostname: signup-sequencer-2
# ports:
# - ${SIGNUP_SEQUENCER_0_PORT:-9082}:8080
# signup-sequencer-3:
# <<: *signup-sequencer-def
# hostname: signup-sequencer-3
# ports:
# - ${SIGNUP_SEQUENCER_0_PORT:-9083}:8080
signup-sequencer-1:
<<: *signup-sequencer-def
hostname: signup-sequencer-1
ports:
- ${SIGNUP_SEQUENCER_1_PORT:-9081}:8080
signup-sequencer-2:
<<: *signup-sequencer-def
hostname: signup-sequencer-2
ports:
- ${SIGNUP_SEQUENCER_2_PORT:-9082}:8080
signup-sequencer-3:
<<: *signup-sequencer-def
hostname: signup-sequencer-3
ports:
- ${SIGNUP_SEQUENCER_3_PORT:-9083}:8080
volumes:
tx_sitter_db_data:
driver: local
Expand Down
10 changes: 7 additions & 3 deletions e2e_tests/scenarios/tests/common/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use signup_sequencer::identity_tree::Hash;
use signup_sequencer::server::data::{
DeletionRequest, InclusionProofRequest, InclusionProofResponse, InsertCommitmentRequest,
};
use tracing::debug;
use tracing::{debug, info};

use crate::common::prelude::StatusCode;

Expand Down Expand Up @@ -126,11 +126,15 @@ pub async fn inclusion_proof(
client: &Client<HttpConnector>,
uri: &String,
commitment: &Hash,
) -> anyhow::Result<InclusionProofResponse> {
) -> anyhow::Result<(StatusCode, Option<InclusionProofResponse>)> {
let result = inclusion_proof_raw(client, uri, commitment).await?;

if !result.status_code.is_success() {
return Ok((result.status_code, None));
}

let result_json = serde_json::from_str::<InclusionProofResponse>(&result.body)
.expect("Failed to parse response as json");

Ok(result_json)
Ok((result.status_code, Some(result_json)))
}
62 changes: 47 additions & 15 deletions e2e_tests/scenarios/tests/common/docker_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};

use anyhow::{Context, Error};
use anyhow::{anyhow, Context, Error};
use hyper::{Body, Client};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
Expand All @@ -26,6 +26,9 @@
semaphore_insertion_port: u32,
semaphore_deletion_port: u32,
signup_sequencer_0_port: u32,
signup_sequencer_1_port: u32,
signup_sequencer_2_port: u32,
signup_sequencer_3_port: u32,
signup_sequencer_balancer_port: u32,
}

Expand Down Expand Up @@ -82,6 +85,18 @@
String::from("SIGNUP_SEQUENCER_0_PORT"),
self.signup_sequencer_0_port.to_string(),
);
res.insert(
String::from("SIGNUP_SEQUENCER_1_PORT"),
self.signup_sequencer_1_port.to_string(),
);
res.insert(
String::from("SIGNUP_SEQUENCER_2_PORT"),
self.signup_sequencer_2_port.to_string(),
);
res.insert(
String::from("SIGNUP_SEQUENCER_3_PORT"),
self.signup_sequencer_3_port.to_string(),
);
res.insert(
String::from("SIGNUP_SEQUENCER_BALANCER_PORT"),
self.signup_sequencer_balancer_port.to_string(),
Expand Down Expand Up @@ -118,7 +133,7 @@
stdout, stderr
);

Ok(parse_exposed_port(stdout))
parse_exposed_port(stdout)
}
}

Expand Down Expand Up @@ -152,6 +167,9 @@
semaphore_insertion_port: 0,
semaphore_deletion_port: 0,
signup_sequencer_0_port: 0,
signup_sequencer_1_port: 0,
signup_sequencer_2_port: 0,
signup_sequencer_3_port: 0,
signup_sequencer_balancer_port: 0,
};

Expand All @@ -171,11 +189,27 @@

tokio::time::sleep(Duration::from_secs(1)).await;

let balancer_port = res.get_mapped_port("signup-sequencer-balancer", 8080)?;
res.update_balancer_port(balancer_port);
let mut balancer_port = Err(anyhow!("Balancer port not queried."));
for _ in 0..3 {
balancer_port = res.get_mapped_port("signup-sequencer-balancer", 8080);
if balancer_port.is_ok() {
break;
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
res.update_balancer_port(balancer_port?);

let chain_port = res.get_mapped_port("chain", 8545)?;
res.update_chain_port(chain_port);
let mut chain_port = Err(anyhow!("Chain port not queried."));
for _ in 0..3 {
chain_port = res.get_mapped_port("chain", 8545);
if chain_port.is_ok() {
break;
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
res.update_chain_port(chain_port?);

await_running(&res).await?;

Expand Down Expand Up @@ -266,19 +300,17 @@
Ok(())
}

fn parse_exposed_port(s: String) -> u32 {
fn parse_exposed_port(s: String) -> anyhow::Result<u32> {
let parts: Vec<_> = s
.split_whitespace()
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect();

parts
.last()
.unwrap()
.split(':')
.last()
.unwrap()
.parse::<u32>()
.unwrap()
let port = parts.last().map(|v| v.split(':').last()).flatten();

Check warning on line 310 in e2e_tests/scenarios/tests/common/docker_compose.rs

View workflow job for this annotation

GitHub Actions / Lint

called `map(..).flatten()` on `Option`

Check warning on line 310 in e2e_tests/scenarios/tests/common/docker_compose.rs

View workflow job for this annotation

GitHub Actions / Lint

called `map(..).flatten()` on `Option`

match port {
Some(port) => port.parse::<u32>().map_err(|err| anyhow!(err)),
None => Err(anyhow!("Port not found in string.")),
}
}
24 changes: 14 additions & 10 deletions e2e_tests/scenarios/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,20 @@ pub async fn mined_inclusion_proof_with_retries(
for _i in 0..retries_count {
last_res = Some(inclusion_proof(client, uri, commitment).await?);

if let Some(ref inclusion_proof_json) = last_res {
if let Some(root) = inclusion_proof_json.0.root {
let (root, ..) = chain
.identity_manager
.query_root(root.into())
.call()
.await?;

if root != U256::zero() {
return Ok(());
if let Some((status_code, ref inclusion_proof_json)) = last_res {
if status_code.is_success() {
if let Some(inclusion_proof_json) = inclusion_proof_json {
if let Some(root) = inclusion_proof_json.0.root {
let (root, ..) = chain
.identity_manager
.query_root(root.into())
.call()
.await?;

if root != U256::zero() {
return Ok(());
}
}
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions e2e_tests/scenarios/tests/insert_100.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ async fn insert_100() -> anyhow::Result<()> {
let uri = format!("http://{}", docker_compose.get_local_addr());
let client = Client::new();

let identities = generate_test_commitments(10);
let identities = generate_test_commitments(100);

for commitment in identities.iter() {
insert_identity_with_retries(&client, &uri, commitment, 10, 3.0).await?;
}

for commitment in identities.iter() {
mined_inclusion_proof_with_retries(&client, &uri, &chain, commitment, 60, 10.0).await?;
mined_inclusion_proof_with_retries(&client, &uri, &chain, commitment, 60, 45.0).await?;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DROP UNIQUE INDEX idx_unique_insertion_leaf;
DROP UNIQUE INDEX idx_unique_deletion_leaf;

DROP TRIGGER validate_pre_root_trigger;
DROP FUNCTION validate_pre_root();

ALTER TABLE identities DROP COLUMN pre_root;
52 changes: 52 additions & 0 deletions schemas/database/015_add_constraints_for_identities.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
CREATE UNIQUE INDEX idx_unique_insertion_leaf on identities(leaf_index) WHERE commitment != E'\\x0000000000000000000000000000000000000000000000000000000000000000';
CREATE UNIQUE INDEX idx_unique_deletion_leaf on identities(leaf_index) WHERE commitment = E'\\x0000000000000000000000000000000000000000000000000000000000000000';

-- Add the new 'prev_root' column
ALTER TABLE identities ADD COLUMN pre_root BYTEA;

-- This constraint ensures that we have consistent database and changes to the tre are done in a valid sequence.
CREATE OR REPLACE FUNCTION validate_pre_root() returns trigger as $$
DECLARE
last_id identities.id%type;
last_root identities.root%type;
BEGIN
SELECT id, root
INTO last_id, last_root
FROM identities
ORDER BY id DESC
LIMIT 1;

-- When last_id is NULL that means there are no records in identities table. The first prev_root can
-- be a value not referencing previous root in database.
IF last_id IS NULL THEN RETURN NEW;
END IF;

IF NEW.pre_root IS NULL THEN RAISE EXCEPTION 'Sent pre_root (%) can be null only for first record in table.', NEW.pre_root;
END IF;

IF (last_root != NEW.pre_root) THEN RAISE EXCEPTION 'Sent pre_root (%) is different than last root (%) in database.', NEW.pre_root, last_root;
END IF;

RETURN NEW;
END;
$$ language plpgsql;

CREATE TRIGGER validate_pre_root_trigger BEFORE INSERT ON identities FOR EACH ROW EXECUTE PROCEDURE validate_pre_root();

-- Below function took around 10 minutes for 10 million records.
DO
$do$
DECLARE
prev_root identities.pre_root%type := NULL;
identity identities%rowtype;
BEGIN
FOR identity IN SELECT * FROM identities ORDER BY id ASC
LOOP
IF identity.pre_root IS NULL THEN UPDATE identities SET pre_root = prev_root WHERE id = identity.id;
END IF;
prev_root = identity.root;
END LOOP;
END
$do$;

CREATE UNIQUE INDEX idx_unique_pre_root on identities(pre_root) WHERE pre_root IS NOT NULL;
17 changes: 15 additions & 2 deletions src/contracts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod abi;
pub mod scanner;

use anyhow::{anyhow, bail, Context};
use ethers::abi::AbiEncode;
use ethers::providers::Middleware;
use ethers::types::{H256, U256};
use tracing::{error, info, instrument};
Expand Down Expand Up @@ -131,8 +132,14 @@ impl IdentityManager {
)
.tx;

let tx_id = format!(
"tx-{}-{}",
hex::encode(pre_root.encode()),
hex::encode(post_root.encode())
);

self.ethereum
.send_transaction(register_identities_transaction, true)
.send_transaction(register_identities_transaction, true, Some(tx_id))
.await
.map_err(|tx_err| anyhow!("{}", tx_err.to_string()))
}
Expand All @@ -158,8 +165,14 @@ impl IdentityManager {
)
.tx;

let tx_id = format!(
"tx-{}-{}",
hex::encode(pre_root.encode()),
hex::encode(post_root.encode())
);

self.ethereum
.send_transaction(delete_identities_transaction, true)
.send_transaction(delete_identities_transaction, true, Some(tx_id))
.await
.map_err(|tx_err| anyhow!("{}", tx_err.to_string()))
}
Expand Down
Loading
Loading