Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Db::new async, force use of new API #323

Merged
merged 19 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion firewood/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ typed-builder = "0.18.0"
bincode = "1.3.3"

[dev-dependencies]
criterion = "0.5.1"
criterion = {version = "0.5.1", features = ["async_tokio"]}
keccak-hasher = "0.15.3"
rand = "0.8.5"
triehash = "0.8.4"
Expand Down
70 changes: 38 additions & 32 deletions firewood/benches/hashops.rs
richardpringle marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

use criterion::{criterion_group, criterion_main, profiler::Profiler, BatchSize, Criterion};
use firewood::{
db::{BatchOp, Db, DbConfig},
db::{BatchOp, DbConfig},
merkle::{Merkle, TrieHash, TRIE_HASH_LEN},
storage::WalConfig,
v2::api::{Db, Proposal},
};
use pprof::ProfilerGuard;
use rand::{distributions::Alphanumeric, rngs::StdRng, Rng, SeedableRng};
Expand All @@ -17,7 +18,7 @@ use shale::{
disk_address::DiskAddress,
CachedStore, ObjCache, Storable, StoredView,
};
use std::{fs::File, iter::repeat_with, ops::Deref, os::raw::c_int, path::Path};
use std::{fs::File, iter::repeat_with, ops::Deref, os::raw::c_int, path::Path, sync::Arc};

const ZERO_HASH: TrieHash = TrieHash([0u8; TRIE_HASH_LEN]);

Expand Down Expand Up @@ -134,36 +135,41 @@ fn bench_db<const N: usize>(criterion: &mut Criterion) {
.benchmark_group("Db")
.sample_size(30)
.bench_function("commit", |b| {
b.iter_batched(
|| {
let cfg =
DbConfig::builder().wal(WalConfig::builder().max_revisions(10).build());

let batch_ops: Vec<_> = repeat_with(|| {
(&mut rng)
.sample_iter(&Alphanumeric)
.take(KEY_LEN)
.collect()
})
.map(|key: Vec<_>| BatchOp::Put {
key,
value: vec![b'v'],
})
.take(N)
.collect();
let db_path = dbg!(std::env::temp_dir());
let db_path = db_path.join("benchmark_db");

let db = Db::new(db_path, &cfg.clone().truncate(true).build()).unwrap();

(db, batch_ops)
},
|(db, batch_ops)| {
let proposal = db.new_proposal(batch_ops).unwrap();
proposal.commit_sync().unwrap();
},
BatchSize::SmallInput,
);
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_batched(
|| {
let batch_ops: Vec<_> = repeat_with(|| {
(&mut rng)
.sample_iter(&Alphanumeric)
.take(KEY_LEN)
.collect()
})
.map(|key: Vec<_>| BatchOp::Put {
key,
value: vec![b'v'],
})
.take(N)
.collect();
batch_ops
},
|batch_ops| async {
let db_path = dbg!(std::env::temp_dir());
let db_path = db_path.join("benchmark_db");
let cfg =
DbConfig::builder().wal(WalConfig::builder().max_revisions(10).build());

let db =
firewood::db::Db::new(db_path, &cfg.clone().truncate(true).build())
.await
.unwrap();

Arc::new(db.propose(batch_ops).await.unwrap())
.commit()
.await
.unwrap()
},
BatchSize::SmallInput,
);
});
}

Expand Down
14 changes: 6 additions & 8 deletions firewood/examples/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{collections::HashMap, error::Error, ops::RangeInclusive, sync::Arc, ti

use firewood::{
db::{Batch, BatchOp, Db, DbConfig},
v2::api::{Db as DbApi, DbView, Proposal},
v2::api::{Db as _, DbView, Proposal},
};
use rand::{distributions::Alphanumeric, Rng};

Expand Down Expand Up @@ -44,11 +44,9 @@ async fn main() -> Result<(), Box<dyn Error>> {

let args = Args::parse();

let db = tokio::task::spawn_blocking(move || {
Db::new("rev_db", &cfg).expect("db initiation should succeed")
})
.await
.unwrap();
let db = Db::new("rev_db", &cfg)
.await
.expect("db initiation should succeed");

let keys = args.batch_keys;
let start = Instant::now();
Expand All @@ -74,7 +72,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let verify = get_keys_to_verify(&batch, args.read_verify_percent);

let proposal: Arc<firewood::db::Proposal> = db.propose(batch).await.unwrap().into();
let proposal = Arc::new(db.propose(batch).await.unwrap());
proposal.commit().await?;
verify_keys(&db, verify).await?;
}
Expand Down Expand Up @@ -107,7 +105,7 @@ fn get_keys_to_verify(batch: &Batch<Vec<u8>, Vec<u8>>, pct: u16) -> HashMap<Vec<
}

async fn verify_keys(
db: &Db,
db: &impl firewood::v2::api::Db,
rkuris marked this conversation as resolved.
Show resolved Hide resolved
verify: HashMap<Vec<u8>, Vec<u8>>,
) -> Result<(), firewood::v2::api::Error> {
if !verify.is_empty() {
Expand Down
56 changes: 26 additions & 30 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub use crate::{
config::{DbConfig, DbRevConfig},
storage::{buffer::DiskBufferConfig, WalConfig},
v2::api::{Batch, BatchOp, Proposal},
};
use crate::{
file,
Expand All @@ -18,7 +19,7 @@ use crate::{
};
use async_trait::async_trait;
use bytemuck::{cast_slice, AnyBitPattern};
use metered::{metered, HitCount};
use metered::metered;
use parking_lot::{Mutex, RwLock};
use shale::{
compact::{CompactSpace, CompactSpaceHeader},
Expand All @@ -42,8 +43,6 @@ use tokio::task::block_in_place;

mod proposal;

pub use proposal::{Batch, BatchOp, Proposal};

use self::proposal::ProposalBase;

const MERKLE_META_SPACE: SpaceId = 0x0;
Expand Down Expand Up @@ -278,8 +277,7 @@ pub struct DbRev<S> {
#[async_trait]
impl<S: ShaleStore<Node> + Send + Sync> api::DbView for DbRev<S> {
async fn root_hash(&self) -> Result<api::HashKey, api::Error> {
self.merkle
.root_hash(self.header.kv_root)
block_in_place(|| self.merkle.root_hash(self.header.kv_root))
.map(|h| *h)
.map_err(|e| api::Error::IO(std::io::Error::new(ErrorKind::Other, e)))
}
Expand Down Expand Up @@ -386,7 +384,7 @@ impl Drop for DbInner {
impl api::Db for Db {
type Historical = DbRev<SharedStore>;

type Proposal = Proposal;
type Proposal = proposal::Proposal;

async fn revision(&self, root_hash: HashKey) -> Result<Arc<Self::Historical>, api::Error> {
let rev = block_in_place(|| self.get_revision(&TrieHash(root_hash)));
Expand All @@ -400,7 +398,9 @@ impl api::Db for Db {
}

async fn root_hash(&self) -> Result<HashKey, api::Error> {
self.kv_root_hash().map(|hash| hash.0).map_err(Into::into)
block_in_place(|| self.kv_root_hash())
.map(|hash| hash.0)
.map_err(Into::into)
}

async fn propose<K: KeyType, V: ValueType>(
Expand Down Expand Up @@ -434,13 +434,17 @@ pub struct Db {
impl Db {
const PARAM_SIZE: u64 = size_of::<DbParams>() as u64;

/// Open a database.
pub fn new<P: AsRef<Path>>(db_path: P, cfg: &DbConfig) -> Result<Self, DbError> {
// TODO: make sure all fds are released at the end
pub async fn new<P: AsRef<Path>>(db_path: P, cfg: &DbConfig) -> Result<Self, api::Error> {
if cfg.truncate {
let _ = std::fs::remove_dir_all(db_path.as_ref());
let _ = tokio::fs::remove_dir_all(db_path.as_ref()).await;
}

block_in_place(|| Db::new_internal(db_path, cfg.clone()))
.map_err(|e| api::Error::InternalError(e.into()))
}

/// Open a database.
fn new_internal<P: AsRef<Path>>(db_path: P, cfg: DbConfig) -> Result<Self, DbError> {
let open_options = if cfg.truncate {
file::Options::Truncate
} else {
Expand All @@ -465,7 +469,7 @@ impl Db {
{
return Err(DbError::InvalidParams);
}
Self::initialize_header_on_disk(cfg, fd0)?;
Self::initialize_header_on_disk(&cfg, fd0)?;
}

// read DbParams
Expand All @@ -482,10 +486,12 @@ impl Db {
let (sender, inbound) = tokio::sync::mpsc::unbounded_channel();
let disk_requester = DiskBufferRequester::new(sender);
let buffer = cfg.buffer.clone();
let disk_thread = Some(std::thread::spawn(move || {
let disk_buffer = DiskBuffer::new(inbound, &buffer, &wal).unwrap();
disk_buffer.run()
}));
let disk_thread = block_in_place(|| {
Some(std::thread::spawn(move || {
let disk_buffer = DiskBuffer::new(inbound, &buffer, &wal).unwrap();
disk_buffer.run()
}))
});

let root_hash_cache: Arc<CachedSpace> = CachedSpace::new(
&StoreConfig::builder()
Expand Down Expand Up @@ -753,10 +759,10 @@ impl Db {
}

/// Create a proposal.
pub fn new_proposal<K: KeyType, V: ValueType>(
pub(crate) fn new_proposal<K: KeyType, V: ValueType>(
&self,
data: Batch<K, V>,
) -> Result<Proposal, DbError> {
) -> Result<proposal::Proposal, DbError> {
richardpringle marked this conversation as resolved.
Show resolved Hide resolved
let mut inner = self.inner.write();
let reset_store_headers = inner.reset_store_headers;
let (store, mut rev) = Db::new_store(
Expand Down Expand Up @@ -792,7 +798,7 @@ impl Db {
rev.flush_dirty().unwrap();

let parent = ProposalBase::View(Arc::clone(&self.revisions.lock().base_revision));
Ok(Proposal {
Ok(proposal::Proposal {
m: Arc::clone(&self.inner),
r: Arc::clone(&self.revisions),
cfg: self.cfg.clone(),
Expand Down Expand Up @@ -904,20 +910,10 @@ impl Db {
self.revisions.lock().base_revision.kv_dump(w)
}
/// Get root hash of the latest generic key-value storage.
pub fn kv_root_hash(&self) -> Result<TrieHash, DbError> {
pub(crate) fn kv_root_hash(&self) -> Result<TrieHash, DbError> {
self.revisions.lock().base_revision.kv_root_hash()
}

/// Get a value in the kv store associated with a particular key.
#[measure(HitCount)]
pub fn kv_get<K: AsRef<[u8]>>(&self, key: K) -> Result<Vec<u8>, DbError> {
self.revisions
.lock()
.base_revision
.kv_get(key)
.ok_or(DbError::KeyNotFound)
}

pub fn metrics(&self) -> Arc<DbMetrics> {
self.metrics.clone()
}
Expand Down
9 changes: 5 additions & 4 deletions firewood/src/file.rs
rkuris marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

// Copied from CedrusDB

use std::fs::{create_dir, remove_dir_all};
use std::ops::Deref;
use std::os::fd::OwnedFd;

Expand Down Expand Up @@ -74,7 +75,7 @@ impl Deref for File {
}
}

pub fn touch_dir(dirname: &str, rootdir: &Path) -> Result<PathBuf, std::io::Error> {
pub(crate) fn touch_dir(dirname: &str, rootdir: &Path) -> Result<PathBuf, std::io::Error> {
rkuris marked this conversation as resolved.
Show resolved Hide resolved
let path = rootdir.join(dirname);
if let Err(e) = std::fs::create_dir(&path) {
// ignore already-exists error
Expand All @@ -85,17 +86,17 @@ pub fn touch_dir(dirname: &str, rootdir: &Path) -> Result<PathBuf, std::io::Erro
Ok(path)
}

pub fn open_dir<P: AsRef<Path>>(
pub(crate) fn open_dir<P: AsRef<Path>>(
path: P,
options: Options,
) -> Result<(PathBuf, bool), std::io::Error> {
let truncate = options == Options::Truncate;

if truncate {
let _ = std::fs::remove_dir_all(path.as_ref());
let _ = remove_dir_all(path.as_ref());
}

match std::fs::create_dir(path.as_ref()) {
match create_dir(path.as_ref()) {
Err(e) if truncate || e.kind() != ErrorKind::AlreadyExists => Err(e),
// the DB already exists
Err(_) => Ok((path.as_ref().to_path_buf(), false)),
Expand Down
2 changes: 1 addition & 1 deletion firewood/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub mod merkle_util;
pub mod proof;
pub mod storage;

pub(crate) mod config;
richardpringle marked this conversation as resolved.
Show resolved Hide resolved
pub mod config;
richardpringle marked this conversation as resolved.
Show resolved Hide resolved
pub mod nibbles;

pub mod v2;
Loading