Skip to content

Commit

Permalink
Added batching using DB. (#731)
Browse files Browse the repository at this point in the history
  • Loading branch information
piohei authored May 28, 2024
1 parent d853878 commit 471f4a7
Show file tree
Hide file tree
Showing 15 changed files with 1,385 additions and 124 deletions.
24 changes: 24 additions & 0 deletions schemas/database/013_batches_and_transactions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- Create ENUM for prover type
CREATE TABLE batches
(
id BIGSERIAL UNIQUE PRIMARY KEY,
next_root BYTEA NOT NULL UNIQUE,
prev_root BYTEA UNIQUE,
created_at TIMESTAMPTZ NOT NULL,
batch_type VARCHAR(50) NOT NULL,
data JSON NOT NULL,

FOREIGN KEY (prev_root) REFERENCES batches (next_root) ON DELETE CASCADE
);

CREATE INDEX idx_batches_prev_root ON batches (prev_root);
CREATE UNIQUE INDEX i_single_null_prev_root ON batches ((batches.prev_root IS NULL)) WHERE batches.prev_root IS NULL;

CREATE TABLE transactions
(
transaction_id VARCHAR(256) NOT NULL UNIQUE PRIMARY KEY,
batch_next_root BYTEA NOT NULL UNIQUE,
created_at TIMESTAMPTZ NOT NULL,

FOREIGN KEY (batch_next_root) REFERENCES batches (next_root) ON DELETE CASCADE
);
22 changes: 21 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::database::Database;
use crate::ethereum::Ethereum;
use crate::identity_tree::{
CanonicalTreeBuilder, Hash, InclusionProof, ProcessedStatus, RootItem, Status, TreeState,
TreeUpdate, TreeVersionReadOps, UnprocessedStatus,
TreeUpdate, TreeVersionReadOps, TreeWithNextVersion, UnprocessedStatus,
};
use crate::prover::map::initialize_prover_maps;
use crate::prover::{ProverConfig, ProverType};
Expand Down Expand Up @@ -114,10 +114,12 @@ impl App {
// Note that we don't have a way of queuing a root here for finalization.
// so it's going to stay as "processed" until the next root is mined.
self.database.mark_root_as_processed_tx(&root_hash).await?;
self.database.delete_batches_after_root(&root_hash).await?;
} else {
// Db is either empty or we're restarting with a new contract/chain
// so we should mark everything as pending
self.database.mark_all_as_pending().await?;
self.database.delete_all_batches().await?;
}

let timer = Instant::now();
Expand Down Expand Up @@ -263,6 +265,7 @@ impl App {

let (processed, batching_builder) = processed_builder.seal_and_continue();
let (batching, mut latest_builder) = batching_builder.seal_and_continue();

let pending_items = self
.database
.get_commitments_by_status(ProcessedStatus::Pending)
Expand All @@ -271,6 +274,15 @@ impl App {
latest_builder.update(&update);
}
let latest = latest_builder.seal();

let batch = self.database.get_latest_batch().await?;
if let Some(batch) = batch {
if batching.get_root() != batch.next_root {
batching.apply_updates_up_to(batch.next_root);
}
assert_eq!(batching.get_root(), batch.next_root);
}

Ok(Some(TreeState::new(mined, processed, batching, latest)))
}

Expand Down Expand Up @@ -357,6 +369,14 @@ impl App {

let latest = latest_builder.seal();

let batch = self.database.get_latest_batch().await?;
if let Some(batch) = batch {
if batching.get_root() != batch.next_root {
batching.apply_updates_up_to(batch.next_root);
}
assert_eq!(batching.get_root(), batch.next_root);
}

Ok(TreeState::new(mined, processed, batching, latest))
}

Expand Down
164 changes: 164 additions & 0 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ mod test {
use super::Database;
use crate::config::DatabaseConfig;
use crate::database::query::DatabaseQuery;
use crate::database::types::BatchType;
use crate::identity_tree::{Hash, ProcessedStatus, Status, UnprocessedStatus};
use crate::prover::identity::Identity;
use crate::prover::{ProverConfig, ProverType};
use crate::utils::secret::SecretUrl;

Expand Down Expand Up @@ -1334,4 +1336,166 @@ mod test {

Ok(())
}

#[tokio::test]
async fn test_insert_batch() -> anyhow::Result<()> {
let docker = Cli::default();
let (db, _db_container) = setup_db(&docker).await?;
let identities: Vec<_> = mock_identities(10)
.iter()
.map(|commitment| {
Identity::new(
(*commitment).into(),
mock_roots(10).iter().map(|root| (*root).into()).collect(),
)
})
.collect();
let roots = mock_roots(2);

db.insert_new_batch_head(&roots[0]).await?;
db.insert_new_batch(&roots[1], &roots[0], BatchType::Insertion, &identities, &[
0,
])
.await?;

Ok(())
}

#[tokio::test]
async fn test_get_next_batch() -> anyhow::Result<()> {
let docker = Cli::default();
let (db, _db_container) = setup_db(&docker).await?;
let identities: Vec<_> = mock_identities(10)
.iter()
.map(|commitment| {
Identity::new(
(*commitment).into(),
mock_roots(10).iter().map(|root| (*root).into()).collect(),
)
})
.collect();
let indexes = vec![0];
let roots = mock_roots(2);

db.insert_new_batch_head(&roots[0]).await?;
db.insert_new_batch(
&roots[1],
&roots[0],
BatchType::Insertion,
&identities,
&indexes,
)
.await?;

let next_batch = db.get_next_batch(&roots[0]).await?;

assert!(next_batch.is_some());

let next_batch = next_batch.unwrap();

assert_eq!(next_batch.prev_root.unwrap(), roots[0]);
assert_eq!(next_batch.next_root, roots[1]);
assert_eq!(next_batch.data.0.identities, identities);
assert_eq!(next_batch.data.0.indexes, indexes);

let next_batch = db.get_next_batch(&roots[1]).await?;

assert!(next_batch.is_none());

Ok(())
}

#[tokio::test]
async fn test_get_next_batch_without_transaction() -> anyhow::Result<()> {
let docker = Cli::default();
let (db, _db_container) = setup_db(&docker).await?;
let identities: Vec<_> = mock_identities(10)
.iter()
.map(|commitment| {
Identity::new(
(*commitment).into(),
mock_roots(10).iter().map(|root| (*root).into()).collect(),
)
})
.collect();
let indexes = vec![0];
let roots = mock_roots(2);
let transaction_id = String::from("173bcbfd-e1d9-40e2-ba10-fc1dfbf742c9");

db.insert_new_batch_head(&roots[0]).await?;
db.insert_new_batch(
&roots[1],
&roots[0],
BatchType::Insertion,
&identities,
&indexes,
)
.await?;

let next_batch = db.get_next_batch_without_transaction().await?;

assert!(next_batch.is_some());

let next_batch = next_batch.unwrap();

assert_eq!(next_batch.prev_root.unwrap(), roots[0]);
assert_eq!(next_batch.next_root, roots[1]);
assert_eq!(next_batch.data.0.identities, identities);
assert_eq!(next_batch.data.0.indexes, indexes);

db.insert_new_transaction(&transaction_id, &roots[1])
.await?;

let next_batch = db.get_next_batch_without_transaction().await?;

assert!(next_batch.is_none());

Ok(())
}

#[tokio::test]
async fn test_get_batch_head() -> anyhow::Result<()> {
let docker = Cli::default();
let (db, _db_container) = setup_db(&docker).await?;
let roots = mock_roots(1);

let batch_head = db.get_batch_head().await?;

assert!(batch_head.is_none());

db.insert_new_batch_head(&roots[0]).await?;

let batch_head = db.get_batch_head().await?;

assert!(batch_head.is_some());
let batch_head = batch_head.unwrap();

assert_eq!(batch_head.prev_root, None);
assert_eq!(batch_head.next_root, roots[0]);
assert!(
batch_head.data.0.identities.is_empty(),
"Should have empty identities."
);
assert!(
batch_head.data.0.indexes.is_empty(),
"Should have empty indexes."
);

Ok(())
}

#[tokio::test]
async fn test_insert_transaction() -> anyhow::Result<()> {
let docker = Cli::default();
let (db, _db_container) = setup_db(&docker).await?;
let roots = mock_roots(1);
let transaction_id = String::from("173bcbfd-e1d9-40e2-ba10-fc1dfbf742c9");

db.insert_new_batch_head(&roots[0]).await?;

db.insert_new_transaction(&transaction_id, &roots[0])
.await?;

Ok(())
}
}
Loading

0 comments on commit 471f4a7

Please sign in to comment.