Skip to content

Commit

Permalink
Untangling lifetimes and borrows (#291)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <rpring9@gmail.com>
  • Loading branch information
richardpringle authored Sep 28, 2023
1 parent 6f29a00 commit 07a6eef
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 51 deletions.
50 changes: 27 additions & 23 deletions shale/src/compact.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE.md for licensing terms.

use crate::ObjCache;

use super::disk_address::DiskAddress;
use super::{CachedStore, Obj, ObjRef, ShaleError, ShaleStore, Storable, StoredView};
use std::fmt::Debug;
use std::io::{Cursor, Write};
use std::num::NonZeroUsize;
use std::ops::DerefMut;
use std::sync::{Arc, RwLock};

#[derive(Debug)]
Expand Down Expand Up @@ -295,16 +296,15 @@ impl std::ops::DerefMut for U64Field {
}

#[derive(Debug)]
struct CompactSpaceInner<T: Send + Sync, M> {
struct CompactSpaceInner<M> {
meta_space: Arc<M>,
compact_space: Arc<M>,
header: CompactSpaceHeaderSliced,
obj_cache: super::ObjCache<T>,
alloc_max_walk: u64,
regn_nbit: u64,
}

impl<T: Storable + Send + Sync, M: CachedStore> CompactSpaceInner<T, M> {
impl<M: CachedStore> CompactSpaceInner<M> {
fn get_descriptor(&self, ptr: DiskAddress) -> Result<Obj<CompactDescriptor>, ShaleError> {
StoredView::ptr_to_obj(self.meta_space.as_ref(), ptr, CompactDescriptor::MSIZE)
}
Expand Down Expand Up @@ -576,7 +576,8 @@ impl<T: Storable + Send + Sync, M: CachedStore> CompactSpaceInner<T, M> {

#[derive(Debug)]
pub struct CompactSpace<T: Send + Sync, M> {
inner: RwLock<CompactSpaceInner<T, M>>,
inner: RwLock<CompactSpaceInner<M>>,
obj_cache: ObjCache<T>,
}

impl<T: Storable + Send + Sync, M: CachedStore> CompactSpace<T, M> {
Expand All @@ -593,10 +594,10 @@ impl<T: Storable + Send + Sync, M: CachedStore> CompactSpace<T, M> {
meta_space,
compact_space,
header: CompactSpaceHeader::into_fields(header)?,
obj_cache,
alloc_max_walk,
regn_nbit,
}),
obj_cache,
};
Ok(cs)
}
Expand All @@ -609,40 +610,40 @@ impl<T: Storable + Send + Sync + Debug + 'static, M: CachedStore + Send + Sync>
let size = item.dehydrated_len() + extra;
let addr = self.inner.write().unwrap().alloc(size)?;

let mut u = {
let obj = {
let inner = self.inner.read().unwrap();
let compact_space = inner.compact_space.as_ref();
let view =
StoredView::item_to_obj(compact_space, addr.try_into().unwrap(), size, item)?;

inner.obj_cache.put(view)
self.obj_cache.put(view)
};

let cache = &self.obj_cache;

let mut obj_ref = ObjRef::new(Some(obj), cache);

// should this use a `?` instead of `unwrap`?
u.write(|_| {}).unwrap();
obj_ref.write(|_| {}).unwrap();

Ok(u)
Ok(obj_ref)
}

fn free_item(&mut self, ptr: DiskAddress) -> Result<(), ShaleError> {
let mut inner = self.inner.write().unwrap();
inner.obj_cache.pop(ptr);
self.obj_cache.pop(ptr);
inner.free(ptr.unwrap().get() as u64)
}

fn get_item(&self, ptr: DiskAddress) -> Result<ObjRef<'_, T>, ShaleError> {
let inner = {
let inner = self.inner.read().unwrap();
let obj_ref = inner
.obj_cache
.get(inner.obj_cache.lock().deref_mut(), ptr)?;
let obj = self.obj_cache.get(ptr)?;

if let Some(obj_ref) = obj_ref {
return Ok(obj_ref);
}
let inner = self.inner.read().unwrap();
let cache = &self.obj_cache;

inner
};
if let Some(obj) = obj {
return Ok(ObjRef::new(Some(obj), cache));
}

if ptr < DiskAddress::from(CompactSpaceHeader::MSIZE as usize) {
return Err(ShaleError::InvalidAddressLength {
Expand All @@ -654,14 +655,17 @@ impl<T: Storable + Send + Sync + Debug + 'static, M: CachedStore + Send + Sync>
let payload_size = inner
.get_header(ptr - CompactHeader::MSIZE as usize)?
.payload_size;
let obj = self.obj_cache.put(inner.get_data_ref(ptr, payload_size)?);
let cache = &self.obj_cache;

Ok(inner.obj_cache.put(inner.get_data_ref(ptr, payload_size)?))
Ok(ObjRef::new(Some(obj), cache))
}

fn flush_dirty(&self) -> Option<()> {
let mut inner = self.inner.write().unwrap();
inner.header.flush_dirty();
inner.obj_cache.flush_dirty()
// hold the write lock to ensure that both cache and header are flushed in-sync
self.obj_cache.flush_dirty()
}
}

Expand Down
42 changes: 14 additions & 28 deletions shale/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use disk_address::DiskAddress;
use std::any::type_name;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Formatter};
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, RwLock, RwLockWriteGuard};
Expand Down Expand Up @@ -190,17 +189,12 @@ impl<T: ?Sized + Send + Sync> Deref for Obj<T> {
/// User handle that offers read & write access to the stored [ShaleStore] item.
pub struct ObjRef<'a, T: Send + Sync> {
inner: Option<Obj<T>>,
cache: ObjCache<T>,
_life: PhantomData<&'a mut ()>,
cache: &'a ObjCache<T>,
}

impl<'a, T: Send + Sync> ObjRef<'a, T> {
pub fn to_longlive(mut self) -> ObjRef<'static, T> {
ObjRef {
inner: self.inner.take(),
cache: ObjCache(self.cache.0.clone()),
_life: PhantomData,
}
fn new(inner: Option<Obj<T>>, cache: &'a ObjCache<T>) -> Self {
Self { inner, cache }
}

#[inline]
Expand Down Expand Up @@ -455,12 +449,10 @@ impl<T: Send + Sync> ObjCache<T> {
}

#[inline(always)]
pub fn get<'a>(
&self,
inner: &mut ObjCacheInner<T>,
ptr: DiskAddress,
) -> Result<Option<ObjRef<'a, T>>, ShaleError> {
if let Some(r) = inner.cached.pop(&ptr) {
fn get(&self, ptr: DiskAddress) -> Result<Option<Obj<T>>, ShaleError> {
let mut inner = self.0.write().unwrap();

let obj_ref = inner.cached.pop(&ptr).map(|r| {
// insert and set to `false` if you can
// When using `get` in parallel, one should not `write` to the same address
inner
Expand All @@ -469,6 +461,8 @@ impl<T: Send + Sync> ObjCache<T> {
.and_modify(|is_pinned| *is_pinned = false)
.or_insert(false);

// if we need to re-enable this code, it has to return from the outer function
//
// return if inner.pinned.insert(ptr, false).is_some() {
// Err(ShaleError::InvalidObj {
// addr: ptr.addr(),
Expand All @@ -484,25 +478,17 @@ impl<T: Send + Sync> ObjCache<T> {
// };

// always return instead of the code above
return Ok(Some(ObjRef {
inner: Some(r),
cache: Self(self.0.clone()),
_life: PhantomData,
}));
}
r
});

Ok(None)
Ok(obj_ref)
}

#[inline(always)]
pub fn put<'a>(&self, inner: Obj<T>) -> ObjRef<'a, T> {
fn put(&self, inner: Obj<T>) -> Obj<T> {
let ptr = inner.as_ptr();
self.lock().pinned.insert(ptr, false);
ObjRef {
inner: Some(inner),
cache: Self(self.0.clone()),
_life: PhantomData,
}
inner
}

#[inline(always)]
Expand Down

0 comments on commit 07a6eef

Please sign in to comment.