Skip to content

Commit

Permalink
Remove new_revision_arc method (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardpringle authored Aug 11, 2023
1 parent 9c28d3c commit 89204f1
Showing 1 changed file with 156 additions and 106 deletions.
262 changes: 156 additions & 106 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE.md for licensing terms.

pub use crate::config::{DbConfig, DbRevConfig};
pub use crate::storage::{buffer::DiskBufferConfig, WalConfig};
use crate::storage::{
AshRecord, CachedSpace, MemStoreR, SpaceWrite, StoreConfig, StoreDelta, StoreRevMut,
StoreRevShared, ZeroStore, PAGE_SIZE_NBIT,
pub use crate::{
config::{DbConfig, DbRevConfig},
storage::{buffer::DiskBufferConfig, WalConfig},
};
use crate::{
file,
merkle::{IdTrans, Merkle, MerkleError, Node, TrieHash, TRIE_HASH_LEN},
proof::{Proof, ProofError},
storage::buffer::{BufferWrite, DiskBuffer, DiskBufferRequester},
storage::{
buffer::{BufferWrite, DiskBuffer, DiskBufferRequester},
AshRecord, CachedSpace, MemStoreR, SpaceWrite, StoreConfig, StoreDelta, StoreRevMut,
StoreRevShared, ZeroStore, PAGE_SIZE_NBIT,
},
};
use bytemuck::{cast_slice, AnyBitPattern};
use metered::{metered, HitCount};
use parking_lot::{Mutex, RwLock};
use shale::compact::CompactSpace;
use shale::ShaleStore;
use shale::{
compact::CompactSpaceHeader, CachedStore, ObjPtr, ShaleError, SpaceId, Storable, StoredView,
compact::{CompactSpace, CompactSpaceHeader},
CachedStore, Obj, ObjPtr, ShaleError, ShaleStore, SpaceId, Storable, StoredView,
};
use std::{
collections::VecDeque,
error::Error,
fmt,
io::{Cursor, Write},
mem::size_of,
path::Path,
sync::Arc,
thread::JoinHandle,
Expand Down Expand Up @@ -374,6 +376,8 @@ pub struct Db<S> {

// #[metered(registry = DbMetrics, visibility = pub)]
impl Db<SharedStore> {
const PARAM_SIZE: u64 = size_of::<DbParams>() as u64;

/// Open a database.
pub fn new<P: AsRef<Path>>(db_path: P, cfg: &DbConfig) -> Result<Self, DbError> {
// TODO: make sure all fds are released at the end
Expand Down Expand Up @@ -424,7 +428,7 @@ impl Db<SharedStore> {
let header_bytes = unsafe {
std::slice::from_raw_parts(
&header as *const DbParams as *const u8,
std::mem::size_of::<DbParams>(),
size_of::<DbParams>(),
)
.to_vec()
};
Expand All @@ -433,7 +437,7 @@ impl Db<SharedStore> {
}

// read DbParams
let mut header_bytes = [0; std::mem::size_of::<DbParams>()];
let mut header_bytes = [0; size_of::<DbParams>()];
nix::sys::uio::pread(fd0, &mut header_bytes, 0).map_err(DbError::System)?;
drop(file0);
let params: DbParams = cast_slice(&header_bytes)[0];
Expand Down Expand Up @@ -547,8 +551,24 @@ impl Db<SharedStore> {
merkle: get_sub_universe_from_empty_delta(&data_cache.merkle),
blob: get_sub_universe_from_empty_delta(&data_cache.blob),
};

let db_header_ref = Db::get_db_header_ref(&base.merkle.meta)?;

let merkle_payload_header_ref =
Db::get_payload_header_ref(&base.merkle.meta, Db::PARAM_SIZE + DbHeader::MSIZE)?;

let blob_payload_header_ref = Db::get_payload_header_ref(&base.blob.meta, 0)?;

let header_refs = (
db_header_ref,
merkle_payload_header_ref,
blob_payload_header_ref,
);

let base_revision = Db::new_revision(
&base,
header_refs,
(base.merkle.meta.clone(), base.merkle.payload.clone()),
(base.blob.meta.clone(), base.blob.payload.clone()),
params.payload_regn_nbit,
cfg.payload_max_walk,
&cfg.rev,
Expand Down Expand Up @@ -582,7 +602,7 @@ impl Db<SharedStore> {
payload_regn_nbit: u64,
cfg: &DbConfig,
) -> Result<(Universe<Arc<StoreRevMut>>, DbRev<Store>), DbError> {
let mut offset = std::mem::size_of::<DbParams>() as u64;
let mut offset = Db::PARAM_SIZE;
let db_header: ObjPtr<DbHeader> = ObjPtr::new_from_addr(offset);
offset += DbHeader::MSIZE;
let merkle_payload_header: ObjPtr<CompactSpaceHeader> = ObjPtr::new_from_addr(offset);
Expand Down Expand Up @@ -626,111 +646,78 @@ impl Db<SharedStore> {
),
};

let mut rev =
Db::new_revision_arc(&store, payload_regn_nbit, cfg.payload_max_walk, &cfg.rev)?;
rev.flush_dirty().unwrap();
let db_header_ref = Db::get_db_header_ref(store.merkle.meta.as_ref())?;

Ok((store, rev))
}
let merkle_payload_header_ref = Db::get_payload_header_ref(
store.merkle.meta.as_ref(),
Db::PARAM_SIZE + DbHeader::MSIZE,
)?;

fn new_revision<K: CachedStore + Clone>(
store: &Universe<K>,
payload_regn_nbit: u64,
payload_max_walk: u64,
cfg: &DbRevConfig,
) -> Result<DbRev<CompactSpace<Node, K>>, DbError> {
// Set up the storage layout
let mut offset = std::mem::size_of::<DbParams>() as u64;
// DbHeader starts after DbParams in merkle meta space
let db_header: ObjPtr<DbHeader> = ObjPtr::new_from_addr(offset);
offset += DbHeader::MSIZE;
// Merkle CompactHeader starts after DbHeader in merkle meta space
let merkle_payload_header: ObjPtr<CompactSpaceHeader> = ObjPtr::new_from_addr(offset);
offset += CompactSpaceHeader::MSIZE;
assert!(offset <= SPACE_RESERVED);
let blob_payload_header_ref = Db::get_payload_header_ref(store.blob.meta.as_ref(), 0)?;

let mut db_header_ref =
StoredView::ptr_to_obj(&store.merkle.meta, db_header, DbHeader::MSIZE).unwrap();
let header_refs = (
db_header_ref,
merkle_payload_header_ref,
blob_payload_header_ref,
);

let merkle_payload_header_ref = StoredView::ptr_to_obj(
&store.merkle.meta,
merkle_payload_header,
shale::compact::CompactHeader::MSIZE,
let mut rev: DbRev<CompactSpace<Node, StoreRevMut>> = Db::new_revision(
header_refs,
(store.merkle.meta.clone(), store.merkle.payload.clone()),
(store.blob.meta.clone(), store.blob.payload.clone()),
payload_regn_nbit,
cfg.payload_max_walk,
&cfg.rev,
)?;
rev.flush_dirty().unwrap();

let merkle_space = shale::compact::CompactSpace::new(
Arc::new(store.merkle.meta.clone()),
Arc::new(store.merkle.payload.clone()),
merkle_payload_header_ref,
shale::ObjCache::new(cfg.merkle_ncached_objs),
payload_max_walk,
payload_regn_nbit,
)
.unwrap();
Ok((store, rev))
}

if db_header_ref.acc_root.is_null() {
let mut err = Ok(());
// create the sentinel node
db_header_ref
.write(|r| {
err = (|| {
Merkle::<Store>::init_root(&mut r.acc_root, &merkle_space)?;
Merkle::<Store>::init_root(&mut r.kv_root, &merkle_space)
})();
})
.unwrap();
err.map_err(DbError::Merkle)?
}
fn get_payload_header_ref<K: CachedStore>(
meta_ref: &K,
header_offset: u64,
) -> Result<Obj<CompactSpaceHeader>, DbError> {
let payload_header = ObjPtr::<CompactSpaceHeader>::new_from_addr(header_offset);
StoredView::ptr_to_obj(
meta_ref,
payload_header,
shale::compact::CompactHeader::MSIZE,
)
.map_err(Into::into)
}

Ok(DbRev {
header: db_header_ref,
merkle: Merkle::new(Box::new(merkle_space)),
})
fn get_db_header_ref<K: CachedStore>(meta_ref: &K) -> Result<Obj<DbHeader>, DbError> {
let db_header = ObjPtr::<DbHeader>::new_from_addr(Db::PARAM_SIZE);
StoredView::ptr_to_obj(meta_ref, db_header, DbHeader::MSIZE).map_err(Into::into)
}

// TODO: can we only have one version of `new_revision`?
fn new_revision_arc<K: CachedStore + Clone>(
store: &Universe<Arc<K>>,
fn new_revision<K: CachedStore, T: Into<Arc<K>>>(
header_refs: (
Obj<DbHeader>,
Obj<CompactSpaceHeader>,
Obj<CompactSpaceHeader>,
),
merkle: (T, T),
_blob: (T, T),
payload_regn_nbit: u64,
payload_max_walk: u64,
cfg: &DbRevConfig,
) -> Result<DbRev<CompactSpace<Node, K>>, DbError> {
// Set up the storage layout
let mut offset = std::mem::size_of::<DbParams>() as u64;
// DbHeader starts after DbParams in merkle meta space
let db_header: ObjPtr<DbHeader> = ObjPtr::new_from_addr(offset);
offset += DbHeader::MSIZE;
// Merkle CompactHeader starts after DbHeader in merkle meta space
let merkle_payload_header: ObjPtr<CompactSpaceHeader> = ObjPtr::new_from_addr(offset);
offset += CompactSpaceHeader::MSIZE;
assert!(offset <= SPACE_RESERVED);
// Blob CompactSpaceHeader starts right in blob meta space
let blob_payload_header: ObjPtr<CompactSpaceHeader> = ObjPtr::new_from_addr(0);
// TODO: This should be a compile time check
const DB_OFFSET: u64 = Db::PARAM_SIZE;
let merkle_offset = DB_OFFSET + DbHeader::MSIZE;
assert!(merkle_offset + CompactSpaceHeader::MSIZE <= SPACE_RESERVED);

let (mut db_header_ref, merkle_payload_header_ref, _blob_payload_header_ref) = {
let merkle_meta_ref = store.merkle.meta.as_ref();
let blob_meta_ref = store.blob.meta.as_ref();
let mut db_header_ref = header_refs.0;
let merkle_payload_header_ref = header_refs.1;

(
StoredView::ptr_to_obj(merkle_meta_ref, db_header, DbHeader::MSIZE).unwrap(),
StoredView::ptr_to_obj(
merkle_meta_ref,
merkle_payload_header,
shale::compact::CompactHeader::MSIZE,
)
.unwrap(),
StoredView::ptr_to_obj(
blob_meta_ref,
blob_payload_header,
shale::compact::CompactHeader::MSIZE,
)
.unwrap(),
)
};
let merkle_meta = merkle.0.into();
let merkle_payload = merkle.1.into();

let merkle_space = shale::compact::CompactSpace::new(
store.merkle.meta.clone(),
store.merkle.payload.clone(),
merkle_meta,
merkle_payload,
merkle_payload_header_ref,
shale::ObjCache::new(cfg.merkle_ncached_objs),
payload_max_walk,
Expand Down Expand Up @@ -893,9 +880,33 @@ impl Db<SharedStore> {
drop(inner_lock);

let cfg = cfg.as_ref().unwrap_or(&self.cfg.rev);
Some(Revision {
rev: Db::new_revision(space, self.payload_regn_nbit, 0, cfg).unwrap(),
})

let db_header_ref = Db::get_db_header_ref(&space.merkle.meta).unwrap();

let merkle_payload_header_ref =
Db::get_payload_header_ref(&space.merkle.meta, Db::PARAM_SIZE + DbHeader::MSIZE)
.unwrap();

let blob_payload_header_ref = Db::get_payload_header_ref(&space.blob.meta, 0).unwrap();

let header_refs = (
db_header_ref,
merkle_payload_header_ref,
blob_payload_header_ref,
);

Revision {
rev: Db::new_revision(
header_refs,
(space.merkle.meta.clone(), space.merkle.payload.clone()),
(space.blob.meta.clone(), space.blob.payload.clone()),
self.payload_regn_nbit,
0,
cfg,
)
.unwrap(),
}
.into()
}
}

Expand Down Expand Up @@ -985,8 +996,26 @@ impl Proposal<Store, SharedStore> {
let m = Arc::clone(&self.m);
let r = Arc::clone(&self.r);
let cfg = self.cfg.clone();
let mut rev = Db::new_revision_arc(
&store,

let db_header_ref = Db::get_db_header_ref(store.merkle.meta.as_ref())?;

let merkle_payload_header_ref = Db::get_payload_header_ref(
store.merkle.meta.as_ref(),
Db::PARAM_SIZE + DbHeader::MSIZE,
)?;

let blob_payload_header_ref = Db::get_payload_header_ref(store.blob.meta.as_ref(), 0)?;

let header_refs = (
db_header_ref,
merkle_payload_header_ref,
blob_payload_header_ref,
);

let mut rev = Db::new_revision(
header_refs,
(store.merkle.meta.clone(), store.merkle.payload.clone()),
(store.blob.meta.clone(), store.blob.payload.clone()),
cfg.payload_regn_nbit,
cfg.payload_max_walk,
&cfg.rev,
Expand Down Expand Up @@ -1133,7 +1162,28 @@ impl Proposal<Store, SharedStore> {
merkle: get_sub_universe_from_empty_delta(&rev_inner.cached_space.merkle),
blob: get_sub_universe_from_empty_delta(&rev_inner.cached_space.blob),
};
let base_revision = Db::new_revision(&base, 0, self.cfg.payload_max_walk, &self.cfg.rev)?;

let db_header_ref = Db::get_db_header_ref(&base.merkle.meta)?;

let merkle_payload_header_ref =
Db::get_payload_header_ref(&base.merkle.meta, Db::PARAM_SIZE + DbHeader::MSIZE)?;

let blob_payload_header_ref = Db::get_payload_header_ref(&base.blob.meta, 0)?;

let header_refs = (
db_header_ref,
merkle_payload_header_ref,
blob_payload_header_ref,
);

let base_revision = Db::new_revision(
header_refs,
(base.merkle.meta.clone(), base.merkle.payload.clone()),
(base.blob.meta.clone(), base.blob.payload.clone()),
0,
self.cfg.payload_max_walk,
&self.cfg.rev,
)?;
revisions.base = base;
revisions.base_revision = Arc::new(base_revision);

Expand Down

0 comments on commit 89204f1

Please sign in to comment.