Skip to content

Commit

Permalink
[ENH] BF writer support reads (#3031)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - New functionality
	 - Implemented `writer.get_clone()` that returns an owned value for the key

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust

## Documentation Changes
None
  • Loading branch information
sanketkedia authored Nov 13, 2024
1 parent ab08cdd commit 50a2220
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 4 deletions.
18 changes: 18 additions & 0 deletions rust/blockstore/src/arrow/block/delta/builder_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ impl<V: ArrowWriteableValue> BTreeBuilderStorage<V> {
self.storage.remove(key)
}

fn get(&self, key: &CompositeKey) -> Option<V::PreparedValue> {
if !self.storage.contains_key(key) {
return None;
}
Some(V::prepare(self.storage.get(key).unwrap().clone()))
}

fn min_key(&self) -> Option<&CompositeKey> {
self.storage.keys().next()
}
Expand Down Expand Up @@ -61,6 +68,10 @@ impl<V: ArrowWriteableValue> VecBuilderStorage<V> {
None
}

fn get(&self, _: &CompositeKey) -> Option<V::PreparedValue> {
unimplemented!()
}

fn min_key(&self) -> Option<&CompositeKey> {
self.storage.first().map(|(key, _)| key)
}
Expand Down Expand Up @@ -137,6 +148,13 @@ impl<V: ArrowWriteableValue> BuilderStorage<V> {
}
}

pub fn get(&self, key: &CompositeKey) -> Option<V::PreparedValue> {
match self {
BuilderStorage::BTreeBuilderStorage(storage) => storage.get(key),
BuilderStorage::VecBuilderStorage(storage) => storage.get(key),
}
}

pub fn len(&self) -> usize {
match self {
BuilderStorage::BTreeBuilderStorage(storage) => storage.len(),
Expand Down
10 changes: 10 additions & 0 deletions rust/blockstore/src/arrow/block/delta/data_record.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::data_record_size_tracker::DataRecordSizeTracker;
use super::BlockKeyArrowBuilder;
use crate::arrow::block::value::data_record_value::DataRecordStorageEntry;
use crate::arrow::types::ArrowWriteableValue;
use crate::{
arrow::types::ArrowWriteableKey,
Expand Down Expand Up @@ -49,6 +50,15 @@ impl DataRecordStorage {
inner.size_tracker.get_key_size()
}

pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<DataRecordStorageEntry> {
let inner = self.inner.read();
let composite_key = CompositeKey {
prefix: prefix.to_string(),
key,
};
inner.storage.get(&composite_key).cloned()
}

pub fn add(&self, prefix: &str, key: KeyWrapper, value: &DataRecord<'_>) {
let mut inner = self.inner.write();
let composite_key = CompositeKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,12 @@ impl<V: ArrowWriteableValue<SizeTracker = SingleColumnSizeTracker>> SingleColumn

(schema.into(), vec![prefix_arr, key_arr, value_arr])
}

pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<V::PreparedValue> {
let composite_key = CompositeKey {
prefix: prefix.to_string(),
key,
};
self.inner.read().storage.get(&composite_key)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use chroma_types::SpannPostingList;
use parking_lot::RwLock;

use crate::{
arrow::types::{ArrowWriteableKey, ArrowWriteableValue},
arrow::{
block::value::spann_posting_list_value::SpannPostingListDeltaEntry,
types::{ArrowWriteableKey, ArrowWriteableValue},
},
key::{CompositeKey, KeyWrapper},
};

Expand Down Expand Up @@ -48,6 +51,19 @@ impl SpannPostingListDelta {
self.inner.read().size_tracker.get_key_size()
}

pub fn get_owned_value(
&self,
prefix: &str,
key: KeyWrapper,
) -> Option<SpannPostingListDeltaEntry> {
let read_guard = self.inner.read();
let composite_key = CompositeKey {
prefix: prefix.to_string(),
key,
};
read_guard.storage.get(&composite_key).cloned()
}

pub fn add(&self, prefix: &str, key: KeyWrapper, value: &SpannPostingList<'_>) {
let mut lock_guard = self.inner.write();
let composite_key = CompositeKey {
Expand Down
15 changes: 14 additions & 1 deletion rust/blockstore/src/arrow/block/value/data_record_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ pub struct ValueBuilderWrapper {
document_builder: StringBuilder,
}

pub type DataRecordStorageEntry = (String, Vec<f32>, Option<Vec<u8>>, Option<String>);

impl ArrowWriteableValue for &DataRecord<'_> {
type ReadableValue<'referred_data> = DataRecord<'referred_data>;
type ArrowBuilder = ValueBuilderWrapper;
type SizeTracker = DataRecordSizeTracker;
type PreparedValue = (String, Vec<f32>, Option<Vec<u8>>, Option<String>);
type PreparedValue = DataRecordStorageEntry;

fn offset_size(item_count: usize) -> usize {
let id_offset = bit_util::round_upto_multiple_of_64((item_count + 1) * 4);
Expand Down Expand Up @@ -178,6 +180,17 @@ impl ArrowWriteableValue for &DataRecord<'_> {

(struct_field, value_arr)
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::DataRecord(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl<'referred_data> ArrowReadableValue<'referred_data> for DataRecord<'referred_data> {
Expand Down
11 changes: 11 additions & 0 deletions rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ impl ArrowWriteableValue for RoaringBitmap {
let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len());
(value_field, value_arr)
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::RoaringBitmap(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl ArrowReadableValue<'_> for RoaringBitmap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
BlockfileWriterMutationOrdering,
};

type SpannPostingListDeltaEntry = (Vec<u32>, Vec<u32>, Vec<f32>);
pub type SpannPostingListDeltaEntry = (Vec<u32>, Vec<u32>, Vec<f32>);

pub struct SpannPostingListBuilderWrapper {
doc_offset_ids_builder: ListBuilder<UInt32Builder>,
Expand Down Expand Up @@ -180,6 +180,17 @@ impl ArrowWriteableValue for &SpannPostingList<'_> {

(value_field, value_arr)
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::SpannPostingListDelta(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl<'referred_data> ArrowReadableValue<'referred_data> for SpannPostingList<'referred_data> {
Expand Down
11 changes: 11 additions & 0 deletions rust/blockstore/src/arrow/block/value/str_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ impl ArrowWriteableValue for String {
let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len());
(value_field, value_arr)
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::String(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl<'referred_data> ArrowReadableValue<'referred_data> for &'referred_data str {
Expand Down
11 changes: 11 additions & 0 deletions rust/blockstore/src/arrow/block/value/u32_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ impl ArrowWriteableValue for u32 {
let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len());
(value_field, value_arr)
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::UInt32(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type: {:?}", &delta.builder),
}
}
}

impl<'a> ArrowReadableValue<'a> for u32 {
Expand Down
11 changes: 11 additions & 0 deletions rust/blockstore/src/arrow/block/value/uint32array_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ impl ArrowWriteableValue for Vec<u32> {

(value_field, Arc::new(value_arr))
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::VecUInt32(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl<'referred_data> ArrowReadableValue<'referred_data> for &'referred_data [u32] {
Expand Down
62 changes: 62 additions & 0 deletions rust/blockstore/src/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,68 @@ impl ArrowUnorderedBlockfileWriter {
Ok(())
}

#[allow(dead_code)]
pub(crate) async fn get_owned<K: ArrowWriteableKey, V: ArrowWriteableValue>(
&self,
prefix: &str,
key: K,
) -> Result<Option<V::PreparedValue>, Box<dyn ChromaError>> {
// TODO: for now the BF writer locks the entire write operation
let _guard = self.write_mutex.lock().await;

// TODO: value must be smaller than the block size except for position lists, which are a special case
// where we split the value across multiple blocks

// Get the target block id for the key
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);

// See if a delta for the target block already exists, if not create a new one and add it to the transaction state
// Creating a delta loads the block entirely into memory

let delta = {
let deltas = self.block_deltas.lock();
deltas.get(&target_block_id).cloned()
};

let delta = match delta {
None => {
let block = match self.block_manager.get(&target_block_id).await {
Ok(Some(block)) => block,
Ok(None) => {
return Err(Box::new(ArrowBlockfileError::BlockNotFound));
}
Err(e) => {
return Err(Box::new(e));
}
};
let new_delta = match self
.block_manager
.fork::<K, V, UnorderedBlockDelta>(&block.id)
.await
{
Ok(delta) => delta,
Err(e) => {
return Err(Box::new(e));
}
};
let new_id = new_delta.id;
// Blocks can be empty.
self.root
.sparse_index
.replace_block(target_block_id, new_delta.id);
{
let mut deltas = self.block_deltas.lock();
deltas.insert(new_id, new_delta.clone());
}
new_delta
}
Some(delta) => delta,
};

Ok(V::get_owned_value_from_delta(prefix, key.into(), &delta))
}

pub(crate) async fn delete<K: ArrowWriteableKey, V: ArrowWriteableValue>(
&self,
prefix: &str,
Expand Down
7 changes: 6 additions & 1 deletion rust/blockstore/src/arrow/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub trait ArrowWriteableValue: Value {
/// Every writable value has a corresponding readable value type. For example, the readable value type for a `String` is `&str`.
type ReadableValue<'referred_data>: ArrowReadableValue<'referred_data>;
/// Some values are a reference type and need to be converted to an owned type or need to be prepared (e.g. serializing a RoaringBitmap) before they can be stored in a delta or Arrow array.
type PreparedValue;
type PreparedValue: Clone;

/// Some values use an offsets array. This returns the size of the offsets array given the number of items in the array.
fn offset_size(item_count: usize) -> usize;
Expand All @@ -45,6 +45,11 @@ pub trait ArrowWriteableValue: Value {
builder: Self::ArrowBuilder,
size_tracker: &Self::SizeTracker,
) -> (Field, Arc<dyn Array>);
fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue>;
}

pub trait ArrowReadableKey<'referred_data>: Key + PartialOrd {
Expand Down

0 comments on commit 50a2220

Please sign in to comment.