From 24d538b65d7f361b271061cc8b1e6c6d30972b92 Mon Sep 17 00:00:00 2001 From: Eric Woolsey Date: Tue, 2 Jul 2024 14:03:23 -0700 Subject: [PATCH] retry-tx fix --- src/app.rs | 2 +- src/database/transaction.rs | 4 +- src/identity/processor.rs | 6 +- src/task_monitor/tasks/insert_identities.rs | 4 +- src/utils/mod.rs | 23 +++- tests/common/mod.rs | 44 ++++-- tests/serializable_transaction.rs | 144 ++++++++++++++++++++ 7 files changed, 197 insertions(+), 30 deletions(-) create mode 100644 tests/serializable_transaction.rs diff --git a/src/app.rs b/src/app.rs index 479536e7..183aa6af 100644 --- a/src/app.rs +++ b/src/app.rs @@ -23,12 +23,12 @@ use crate::identity_tree::{ use crate::prover::map::initialize_prover_maps; use crate::prover::repository::ProverRepository; use crate::prover::{ProverConfig, ProverType}; +use crate::retry_tx; use crate::server::data::{ InclusionProofResponse, ListBatchSizesResponse, VerifySemaphoreProofQuery, VerifySemaphoreProofRequest, VerifySemaphoreProofResponse, }; use crate::server::error::Error as ServerError; -use crate::utils::retry_tx; pub struct App { pub database: Arc, diff --git a/src/database/transaction.rs b/src/database/transaction.rs index 10109429..c2e98e11 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -4,7 +4,7 @@ use tracing::instrument; use crate::database::query::DatabaseQuery; use crate::database::{Database, Error}; use crate::identity_tree::{Hash, ProcessedStatus}; -use crate::utils::retry_tx; +use crate::retry_tx; async fn mark_root_as_processed( tx: &mut Transaction<'_, Postgres>, @@ -96,7 +96,7 @@ impl Database { retry_tx!(self.pool, tx, { mark_root_as_processed(&mut tx, root).await?; tx.delete_batches_after_root(root).await?; - Ok(()) + Result::<_, Error>::Ok(()) }) .await } diff --git a/src/identity/processor.rs b/src/identity/processor.rs index b0302f48..70a6f22d 100644 --- a/src/identity/processor.rs +++ b/src/identity/processor.rs @@ -24,8 +24,8 @@ use crate::identity_tree::{Canonical, Hash, Intermediate, TreeVersion, TreeWithN use crate::prover::identity::Identity; use crate::prover::repository::ProverRepository; use crate::prover::Prover; +use crate::retry_tx; use crate::utils::index_packing::pack_indices; -use crate::utils::retry_tx; pub type TransactionId = String; @@ -549,7 +549,7 @@ impl OnChainIdentityProcessor { .await?; } - Ok(()) + Result::<_, anyhow::Error>::Ok(()) }) .await } @@ -648,7 +648,7 @@ impl OffChainIdentityProcessor { .await?; } - Ok(()) + Result::<_, anyhow::Error>::Ok(()) }) .await } diff --git a/src/task_monitor/tasks/insert_identities.rs b/src/task_monitor/tasks/insert_identities.rs index 6a982fc2..e7afd568 100644 --- a/src/task_monitor/tasks/insert_identities.rs +++ b/src/task_monitor/tasks/insert_identities.rs @@ -10,7 +10,7 @@ use crate::database::query::DatabaseQuery as _; use crate::database::types::UnprocessedCommitment; use crate::database::Database; use crate::identity_tree::{Latest, TreeVersion, TreeVersionReadOps, UnprocessedStatus}; -use crate::utils::retry_tx; +use crate::retry_tx; pub async fn insert_identities( app: Arc, @@ -100,7 +100,7 @@ pub async fn insert_identities_batch( tx.remove_unprocessed_identity(identity).await?; } - Ok(()) + Result::<_, anyhow::Error>::Ok(()) }) .await } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 36f23493..02ebb3da 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -30,6 +30,7 @@ pub const TX_RETRY_LIMIT: u32 = 10; /// tx.execute("SELECT * FROM table").await?; /// Ok(tx.execute("SELECT * FROM other").await?) /// }).await; +#[macro_export] macro_rules! retry_tx { ($pool:expr, $tx:ident, $expression:expr) => { async { @@ -38,20 +39,29 @@ macro_rules! retry_tx { loop { let mut $tx = $pool.begin().await?; res = async { $expression }.await; - if res.is_err() { - $tx.rollback().await?; - return res; + let limit = 10; + if let Err(e) = res { + counter += 1; + if counter > limit { + return Err(e.into()); + } else { + $tx.rollback().await?; + tracing::warn!( + error = ?e, + "db transaction returned error ({counter}/{limit})" + ); + continue; + } } match $tx.commit().await { Err(e) => { counter += 1; - let limit = crate::utils::TX_RETRY_LIMIT; if counter > limit { return Err(e.into()); } else { tracing::warn!( - "db transaction commit failed ({counter}/{limit}): {:?}", - e + error = ?e, + "db transaction commit failed ({counter}/{limit})" ); } } @@ -62,7 +72,6 @@ macro_rules! retry_tx { } }; } -pub(crate) use retry_tx; pub fn spawn_monitored_with_backoff( future_spawner: S, diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4963c0b6..a2c17dec 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -401,16 +401,13 @@ pub async fn test_inclusion_status( assert_eq!(expected_status, result.0.status,); } -#[instrument(skip_all)] -pub async fn test_delete_identity( +pub async fn api_delete_identity( uri: &str, client: &Client, - ref_tree: &mut PoseidonTree, - test_leaves: &[Field], - leaf_index: usize, + leaf: &Field, expect_failure: bool, -) -> (merkle_tree::Proof, Field) { - let body = construct_delete_identity_body(&test_leaves[leaf_index]); +) { + let body = construct_delete_identity_body(leaf); let req = Request::builder() .method("POST") @@ -433,7 +430,18 @@ pub async fn test_delete_identity( assert!(response.status().is_success()); assert!(bytes.is_empty()); } +} +#[instrument(skip_all)] +pub async fn test_delete_identity( + uri: &str, + client: &Client, + ref_tree: &mut PoseidonTree, + test_leaves: &[Field], + leaf_index: usize, + expect_failure: bool, +) -> (merkle_tree::Proof, Field) { + api_delete_identity(uri, client, &test_leaves[leaf_index], expect_failure).await; ref_tree.set(leaf_index, Hash::ZERO); (ref_tree.proof(leaf_index).unwrap(), ref_tree.root()) } @@ -556,14 +564,8 @@ pub async fn test_remove_batch_size( } #[instrument(skip_all)] -pub async fn test_insert_identity( - uri: &str, - client: &Client, - ref_tree: &mut PoseidonTree, - test_leaves: &[Field], - leaf_index: usize, -) -> (merkle_tree::Proof, Field) { - let body = construct_insert_identity_body(&test_leaves[leaf_index]); +pub async fn api_insert_identity(uri: &str, client: &Client, leaf: &Field) { + let body = construct_insert_identity_body(leaf); let req = Request::builder() .method("POST") .uri(uri.to_owned() + "/insertIdentity") @@ -583,6 +585,18 @@ pub async fn test_insert_identity( } assert!(bytes.is_empty()); +} + +#[instrument(skip_all)] +pub async fn test_insert_identity( + uri: &str, + client: &Client, + ref_tree: &mut PoseidonTree, + test_leaves: &[Field], + leaf_index: usize, +) -> (merkle_tree::Proof, Field) { + api_insert_identity(uri, client, &test_leaves[leaf_index]).await; + ref_tree.set(leaf_index, test_leaves[leaf_index]); (ref_tree.proof(leaf_index).unwrap(), ref_tree.root()) diff --git a/tests/serializable_transaction.rs b/tests/serializable_transaction.rs new file mode 100644 index 00000000..7b4ee21d --- /dev/null +++ b/tests/serializable_transaction.rs @@ -0,0 +1,144 @@ +mod common; +use common::prelude::*; +use futures::stream::StreamExt; +use signup_sequencer::retry_tx; +use sqlx::postgres::PgPoolOptions; +use sqlx::{Postgres, Transaction}; +use tokio::time::{sleep, Duration}; + +async fn setup(pool: &sqlx::Pool) -> Result<(), sqlx::Error> { + retry_tx!(pool, tx, { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS accounts ( + id SERIAL PRIMARY KEY, + balance INT + ); + "#, + ) + .execute(&mut *tx) + .await?; + + sqlx::query("TRUNCATE TABLE accounts RESTART IDENTITY;") + .execute(&mut *tx) + .await?; + + sqlx::query("INSERT INTO accounts (balance) VALUES (100), (200);") + .execute(&mut *tx) + .await?; + + Result::<_, anyhow::Error>::Ok(()) + }) + .await + .unwrap(); + + Ok(()) +} + +async fn transaction_1(pool: &sqlx::Pool) -> Result<(), sqlx::Error> { + retry_tx!(pool, tx, { + sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") + .execute(&mut *tx) + .await?; + + let balance: (i32,) = sqlx::query_as("SELECT balance FROM accounts WHERE id = 1") + .fetch_one(&mut *tx) + .await?; + + println!("Transaction 1: Balance of account 1 is {}", balance.0); + + // Simulate some work + sleep(Duration::from_secs(5)).await; + + sqlx::query("UPDATE accounts SET balance = balance + 30 WHERE id = 2") + .execute(&mut *tx) + .await?; + + Result::<_, anyhow::Error>::Ok(()) + }) + .await + .unwrap(); + + Ok(()) +} + +async fn transaction_2(pool: &sqlx::Pool) -> Result<(), sqlx::Error> { + let mut tx: Transaction<'_, Postgres> = pool.begin().await?; + + sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") + .execute(&mut *tx) + .await?; + + let balance: (i32,) = sqlx::query_as("SELECT balance FROM accounts WHERE id = 2") + .fetch_one(&mut *tx) + .await?; + + println!("Transaction 2: Balance of account 2 is {}", balance.0); + + sqlx::query("UPDATE accounts SET balance = balance + 50 WHERE id = 1") + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) +} + +#[tokio::test] +async fn serializable_transaction() -> Result<(), anyhow::Error> { + init_tracing_subscriber(); + info!("Starting serializable_transaction"); + + let insertion_batch_size: usize = 500; + let deletion_batch_size: usize = 10; + + let ref_tree = PoseidonTree::new(DEFAULT_TREE_DEPTH + 1, ruint::Uint::ZERO); + let initial_root: U256 = ref_tree.root().into(); + + let docker = Cli::default(); + let (mock_chain, db_container, _insertion_prover_map, _deletion_prover_map, micro_oz) = + spawn_deps( + initial_root, + &[insertion_batch_size], + &[deletion_batch_size], + DEFAULT_TREE_DEPTH as u8, + &docker, + ) + .await?; + + let db_socket_addr = db_container.address(); + let db_url = format!("postgres://postgres:postgres@{db_socket_addr}/database"); + + let temp_dir = tempfile::tempdir()?; + info!( + "temp dir created at: {:?}", + temp_dir.path().join("testfile") + ); + + let config = TestConfigBuilder::new() + .db_url(&db_url) + .oz_api_url(µ_oz.endpoint()) + .oz_address(micro_oz.address()) + .identity_manager_address(mock_chain.identity_manager.address()) + .primary_network_provider(mock_chain.anvil.endpoint()) + .cache_file(temp_dir.path().join("testfile").to_str().unwrap()) + .build()?; + + let (..) = spawn_app(config.clone()) + .await + .expect("Failed to spawn app."); + + let pool = PgPoolOptions::new() + .max_connections(100) + .connect(&db_url) + .await?; + + setup(&pool).await?; + futures::stream::iter(0..2) + .for_each_concurrent(None, |_| async { + transaction_1(&pool).await.unwrap(); + transaction_2(&pool).await.unwrap(); + }) + .await; + + Ok(()) +}