Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming iterator spike #342

Closed
wants to merge 17 commits into from
1 change: 1 addition & 0 deletions firewood/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ thiserror = "1.0.38"
tokio = { version = "1.21.1", features = ["rt", "sync", "macros", "rt-multi-thread"] }
typed-builder = "0.18.0"
bincode = "1.3.3"
itertools = "0.11.0"

[dev-dependencies]
criterion = {version = "0.5.1", features = ["async_tokio"]}
Expand Down
26 changes: 20 additions & 6 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE.md for licensing terms.

use crate::shale::{
self,
compact::{CompactSpace, CompactSpaceHeader},
disk_address::DiskAddress,
CachedStore, Obj, ShaleError, ShaleStore, SpaceId, Storable, StoredView,
};
pub use crate::{
config::{DbConfig, DbRevConfig},
storage::{buffer::DiskBufferConfig, WalConfig},
Expand All @@ -23,8 +17,18 @@ use crate::{
},
v2::api::{self, HashKey, KeyType, Proof, ValueType},
};
use crate::{
merkle,
shale::{
self,
compact::{CompactSpace, CompactSpaceHeader},
disk_address::DiskAddress,
CachedStore, Obj, ShaleError, ShaleStore, SpaceId, Storable, StoredView,
},
};
use async_trait::async_trait;
use bytemuck::{cast_slice, AnyBitPattern};

use metered::metered;
use parking_lot::{Mutex, RwLock};
use std::{
Expand Down Expand Up @@ -312,6 +316,16 @@ impl<S: ShaleStore<Node> + Send + Sync> api::DbView for DbRev<S> {
}

impl<S: ShaleStore<Node> + Send + Sync> DbRev<S> {
pub fn stream<K: KeyType>(
&self,
start_key: Option<K>,
) -> Result<merkle::MerkleKeyValueStream<'_, K, S>, api::Error> {
// TODO: get first key when start_key is None
self.merkle
.get_iter(start_key, self.header.kv_root)
.map_err(|e| api::Error::InternalError(e.into()))
}

fn flush_dirty(&mut self) -> Option<()> {
self.header.flush_dirty();
self.merkle.flush_dirty()?;
Expand Down
212 changes: 210 additions & 2 deletions firewood/src/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
// See the file LICENSE.md for licensing terms.

use crate::shale::{self, disk_address::DiskAddress, ObjWriteError, ShaleError, ShaleStore};
use crate::v2::api;
use crate::{nibbles::Nibbles, v2::api::Proof};
use futures::Stream;
use itertools::Itertools;
use sha3::Digest;
use std::pin::pin;
use std::{
cmp::Ordering,
collections::HashMap,
io::Write,
iter::once,
sync::{atomic::Ordering::Relaxed, OnceLock},
task::Poll,
};
use thiserror::Error;

Expand Down Expand Up @@ -874,7 +879,7 @@ impl<S: ShaleStore<Node> + Send + Sync> Merkle<S> {

let (found, parents, deleted) = {
let (node_ref, mut parents) =
self.get_node_and_parents_by_key(self.get_node(root)?, key)?;
self.get_node_and_parents_by_key(self.get_node(root)?, &key)?;

let Some(mut node_ref) = node_ref else {
return Ok(None);
Expand Down Expand Up @@ -963,7 +968,7 @@ impl<S: ShaleStore<Node> + Send + Sync> Merkle<S> {
fn get_node_and_parents_by_key<'a, K: AsRef<[u8]>>(
&'a self,
node_ref: ObjRef<'a>,
key: K,
key: &K,
rkuris marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(Option<ObjRef<'a>>, ParentRefs<'a>), MerkleError> {
let mut parents = Vec::new();
let node_ref = self.get_node_by_key_with_callback(node_ref, key, |node_ref, nib| {
Expand Down Expand Up @@ -1175,6 +1180,180 @@ impl<S: ShaleStore<Node> + Send + Sync> Merkle<S> {
pub fn flush_dirty(&self) -> Option<()> {
self.store.flush_dirty()
}

pub fn get_iter<K: AsRef<[u8]>>(
&self,
key: Option<K>,
root: DiskAddress,
) -> Result<MerkleKeyValueStream<'_, K, S>, MerkleError> {
// TODO: if DiskAddress::is_null() ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will all this TODOs be tracked in a issue so that we won't forget about it? All of them seem a must do.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed in #346

Ok(MerkleKeyValueStream {
starting_key: key,
merkle_root: root,
merkle: self,
current_node: Default::default(),
parents: Default::default(),
current_key: Default::default(),
})
}
}

pub struct MerkleKeyValueStream<'a, K, S> {
starting_key: Option<K>,
merkle_root: DiskAddress,
merkle: &'a Merkle<S>,
// current node is the node that was last returned from poll_next
current_node: Option<ObjRef<'a>>,
// parents hold pointers up the tree to the parents
parents: Vec<(ObjRef<'a>, u8)>,
// this is the last key returned
// TODO: improve this; we can probably just adjust current_key as we walk up
// and down the merkle tree rather than fully rebuilding it each time
current_key: Option<Vec<u8>>,
}

impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore<node::Node> + Send + Sync> Stream
for MerkleKeyValueStream<'a, K, S>
{
type Item = Result<(Vec<u8>, Vec<u8>), api::Error>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut pinned_self = pin!(self);
rkuris marked this conversation as resolved.
Show resolved Hide resolved
if let Some(key) = pinned_self.starting_key.take() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to wrap this in an enum so that impossible stakes cannot be represented.

enum Key {
    None,
    Starting(Vec<u8>),
    Current(Vec<u8>),
}

or whatever.

Right now, it's possible for the type to have both a starting-key and a current-key, which shouldn't be the case, right?

I'm not looking too far in depth, so I could have missed something, but if we can't represent impossible states, there will be fewer code-paths.

// skip to this key, which must exist
// TODO: support finding the next key after K
let root_node = pinned_self
.merkle
.get_node(pinned_self.merkle_root)
.map_err(|e| api::Error::InternalError(e.into()))?;

(pinned_self.current_node, pinned_self.parents) = pinned_self
.merkle
.get_node_and_parents_by_key(root_node, &key)
.map_err(|e| api::Error::InternalError(e.into()))?;
if pinned_self.current_node.as_ref().is_none() {
return Poll::Ready(None);
}
pinned_self.current_key = Some(key.as_ref().to_vec());
return match pinned_self.current_node.as_ref().unwrap().inner() {
NodeType::Branch(branch) => Poll::Ready(Some(Ok((
key.as_ref().to_vec(),
branch.value.to_owned().unwrap().to_vec(),
)))),

NodeType::Leaf(leaf) => {
Poll::Ready(Some(Ok((key.as_ref().to_vec(), leaf.1.to_vec()))))
}
NodeType::Extension(_) => todo!(),
};
}
// The current node might be none if the tree is empty or we happen to be at the end
let current_node = match pinned_self.current_node.take() {
None => return Poll::Ready(None),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, if I don't pass in a key to the get_iter method, it'll hit this every time instead of starting from the root, right?

Whether I'm right or wrong, there should be a test for an empty key.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't do the empty key test yet because it's not implemented. I need to add code that walks down the leftmost branch every time and returns that key, which doesn't currently exist.

I'm thinking about not using the existing get_node_and_parents_by_key or refactoring it so it supports:

  • Option so you can get the lowest leaf node
  • returning the next highest key if the key can't be found (maybe as an option? or wrapping in another function to discard the key if it isn't found?)

Some(node) => node,
};
rkuris marked this conversation as resolved.
Show resolved Hide resolved

let next_node = match current_node.inner() {
NodeType::Branch(branch) => {
// previously rendered the value from a branch node, so walk down to the first child
rkuris marked this conversation as resolved.
Show resolved Hide resolved
if let Some(child) = branch.children[0] {
pinned_self.parents.push((current_node, 0));
Some(
pinned_self
.merkle
.get_node(child)
.map_err(|e| api::Error::InternalError(e.into()))?,
)
} else {
// Branch node with no first child? Should have been a leaf node?
return Poll::Ready(Some(Err(api::Error::InternalError(Box::new(
MerkleError::ParentLeafBranch,
)))));
}
}
NodeType::Leaf(_leaf) => {
let mut next = pinned_self.parents.pop();
loop {
match next {
None => return Poll::Ready(None),
Some((parent, child_position)) => {
// This code assumes all parents are branch nodes and will panic otherwise
if child_position as usize == NBRANCH - 1 {
// don't index past end of list
next = pinned_self.parents.pop();
continue;
}
rkuris marked this conversation as resolved.
Show resolved Hide resolved
match parent.inner().as_branch().unwrap().chd()
[child_position.wrapping_add(1) as usize]
{
Some(addr) => {
// there is a child at the next address, so walk down it
// We use u8::MAX to walk down the leftmost value, as it
// gets increased immediately once popped
let child = pinned_self
.merkle
.get_node(addr)
.map(|node| (node, u8::MAX))
.map_err(|e| api::Error::InternalError(e.into()))?;
let keep_going = child.0.inner().is_branch();
next = Some(child);
pinned_self
.parents
.push((parent, child_position.wrapping_add(1)));
if !keep_going {
break;
}
}
None => {
next = pinned_self.parents.pop();
continue;
rkuris marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
}
// recompute current_key
// TODO: Can we keep current_key updated as we walk the tree instead of building it from the top all the time?
let current_key = pinned_self
.parents
.iter()
.skip(1)
rkuris marked this conversation as resolved.
Show resolved Hide resolved
.map(|parent| parent.1)
.tuples()
rkuris marked this conversation as resolved.
Show resolved Hide resolved
.map(|(hi, lo)| (hi << 4) + lo)
.collect::<Vec<u8>>();
pinned_self.current_key = current_key.into(); // Some(current_node.inner().as_leaf().unwrap().0.to_vec());
pinned_self
.current_key
.as_mut()
.unwrap()
rkuris marked this conversation as resolved.
Show resolved Hide resolved
.extend(current_node.inner().as_leaf().unwrap().0.to_vec());
rkuris marked this conversation as resolved.
Show resolved Hide resolved
next.map(|node| node.0)
}

NodeType::Extension(_) => todo!(),
rkuris marked this conversation as resolved.
Show resolved Hide resolved
};
pinned_self.current_node = next_node;

match &pinned_self.current_node {
None => Poll::Ready(None),
Some(objref) => match objref.inner() {
NodeType::Branch(branch) => Poll::Ready(Some(Ok((
pinned_self.current_key.as_ref().unwrap().to_vec(),
branch.value.to_owned().unwrap().to_vec(),
)))),

NodeType::Leaf(leaf) => Poll::Ready(Some(Ok((
pinned_self.current_key.as_ref().unwrap().to_vec(),
leaf.1.to_vec(),
)))),
NodeType::Extension(_) => todo!(),
},
}
}
}

fn set_parent(new_chd: DiskAddress, parents: &mut [(ObjRef, u8)]) {
Expand Down Expand Up @@ -1270,10 +1449,12 @@ pub fn from_nibbles(nibbles: &[u8]) -> impl Iterator<Item = u8> + '_ {
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use node::tests::{extension, leaf};
use shale::{cached::DynamicMem, compact::CompactSpace, CachedStore};
use std::sync::Arc;
use test_case::test_case;
//use itertools::Itertools;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why not remove this line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #346


#[test_case(vec![0x12, 0x34, 0x56], vec![0x1, 0x2, 0x3, 0x4, 0x5, 0x6])]
#[test_case(vec![0xc0, 0xff], vec![0xc, 0x0, 0xf, 0xf])]
Expand Down Expand Up @@ -1388,6 +1569,33 @@ mod tests {
}
}

#[tokio::test]
async fn iterate_empty() {
let merkle = create_test_merkle();
let root = merkle.init_root().unwrap();
let mut it = merkle.get_iter(Some(b"x"), root).unwrap();
let next = it.next().await;
assert!(next.is_none())
}

#[tokio::test]
async fn iterate_many() {
let mut merkle = create_test_merkle();
let root = merkle.init_root().unwrap();

for k in u8::MIN..=u8::MAX {
merkle.insert([k], vec![k], root).unwrap();
}

let mut it = merkle.get_iter(Some([u8::MIN]), root).unwrap();
for k in u8::MIN..=u8::MAX {
let next = it.next().await.unwrap().unwrap();
assert_eq!(next.0, next.1);
assert_eq!(next.1, vec![k]);
}
assert!(it.next().await.is_none())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also add tests for returning a stream for an empty trie, inserting a node at the key, then making sure the first value in the stream is that node.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test will happen after StartAtBeginning is implemented in the next PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's only one case. There are cases here that can be tested now. The logic that has been added here should be accompanied by tests to test the cases.


#[test]
fn remove_one() {
let key = b"hello";
Expand Down
2 changes: 1 addition & 1 deletion firewood/src/v2/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn vec_into_batch<K: KeyType, V: ValueType>(value: Vec<(K, V)>) -> Batch<K,
.collect()
}

/// Errors returned through the API
/// Errors returned through the AIP
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum Error {
Expand Down