From 969684f7315431737cd7b573a0670c390425f337 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Fri, 3 Nov 2023 23:25:23 +0000 Subject: [PATCH 01/17] Streaming iterator spike --- firewood/src/db.rs | 10 ++++++++-- firewood/src/merkle.rs | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/firewood/src/db.rs b/firewood/src/db.rs index de1ff059c..72487f697 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -1,12 +1,12 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use crate::shale::{ +use crate::{shale::{ self, compact::{CompactSpace, CompactSpaceHeader}, disk_address::DiskAddress, CachedStore, Obj, ShaleError, ShaleStore, SpaceId, Storable, StoredView, -}; +}, merkle}; pub use crate::{ config::{DbConfig, DbRevConfig}, storage::{buffer::DiskBufferConfig, WalConfig}, @@ -25,6 +25,7 @@ use crate::{ }; use async_trait::async_trait; use bytemuck::{cast_slice, AnyBitPattern}; + use metered::metered; use parking_lot::{Mutex, RwLock}; use std::{ @@ -312,6 +313,11 @@ impl + Send + Sync> api::DbView for DbRev { } impl + Send + Sync> DbRev { + pub fn stream(&self, start_key: Option) -> Result, api::Error> { + // TODO: get first key when start_key is None + self.merkle.get_iter(start_key.unwrap(), self.header.kv_root).map_err(|e| api::Error::InternalError(e.into())) + } + fn flush_dirty(&mut self) -> Option<()> { self.header.flush_dirty(); self.merkle.flush_dirty()?; diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index f1121ce7e..41f39e230 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -3,6 +3,7 @@ use crate::shale::{self, disk_address::DiskAddress, ObjWriteError, ShaleError, ShaleStore}; use crate::{nibbles::Nibbles, v2::api::Proof}; +use futures::Stream; use sha3::Digest; use std::{ cmp::Ordering, @@ -1175,6 +1176,38 @@ impl + Send + Sync> Merkle { pub fn flush_dirty(&self) -> Option<()> { self.store.flush_dirty() } + + pub fn get_iter>( + &self, + key: K, + root: DiskAddress, + ) -> Result, MerkleError> { + // TODO: if DiskAddress::is_null() ... + let root_node = self.get_node(root)?; + let (node, parents) = self.get_node_and_parents_by_key(root_node, key)?; + Ok(MerkleKeyValueStream{_merkle: self, node, _parents: parents}) + } +} + +pub struct MerkleKeyValueStream<'a, S> { + _merkle: &'a Merkle, + node: Option>, + _parents: Vec<(ObjRef<'a>, u8)>, +} + +impl<'a, S> Stream for MerkleKeyValueStream<'a, S> { + type Item = (Vec, Vec); + + fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll> { + let node = match &self.node { + None => return std::task::Poll::Ready(None), + Some(node) => node, + }; + let ret = node.inner().as_leaf().unwrap(); + // TODO: advance to next leaf + // TODO: construct full path at this point, maybe save it + std::task::Poll::Ready(Some((ret.0.to_vec(), ret.1.to_vec()))) + } } fn set_parent(new_chd: DiskAddress, parents: &mut [(ObjRef, u8)]) { From c806b7a161558ed6599f2c3c0ed0323c5fe6985e Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Sat, 4 Nov 2023 15:37:33 +0000 Subject: [PATCH 02/17] Format --- firewood/src/db.rs | 26 +++++++++++++++++--------- firewood/src/merkle.rs | 13 ++++++++++--- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/firewood/src/db.rs b/firewood/src/db.rs index 72487f697..83b8f4a68 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -1,12 +1,6 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use crate::{shale::{ - self, - compact::{CompactSpace, CompactSpaceHeader}, - disk_address::DiskAddress, - CachedStore, Obj, ShaleError, ShaleStore, SpaceId, Storable, StoredView, -}, merkle}; pub use crate::{ config::{DbConfig, DbRevConfig}, storage::{buffer::DiskBufferConfig, WalConfig}, @@ -23,6 +17,15 @@ use crate::{ }, v2::api::{self, HashKey, KeyType, Proof, ValueType}, }; +use crate::{ + merkle, + shale::{ + self, + compact::{CompactSpace, CompactSpaceHeader}, + disk_address::DiskAddress, + CachedStore, Obj, ShaleError, ShaleStore, SpaceId, Storable, StoredView, + }, +}; use async_trait::async_trait; use bytemuck::{cast_slice, AnyBitPattern}; @@ -313,11 +316,16 @@ impl + Send + Sync> api::DbView for DbRev { } impl + Send + Sync> DbRev { - pub fn stream(&self, start_key: Option) -> Result, api::Error> { + pub fn stream( + &self, + start_key: Option, + ) -> Result, api::Error> { // TODO: get first key when start_key is None - self.merkle.get_iter(start_key.unwrap(), self.header.kv_root).map_err(|e| api::Error::InternalError(e.into())) + self.merkle + .get_iter(start_key.unwrap(), self.header.kv_root) + .map_err(|e| api::Error::InternalError(e.into())) } - + fn flush_dirty(&mut self) -> Option<()> { self.header.flush_dirty(); self.merkle.flush_dirty()?; diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index 41f39e230..47baf3ac1 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -1185,7 +1185,11 @@ impl + Send + Sync> Merkle { // TODO: if DiskAddress::is_null() ... let root_node = self.get_node(root)?; let (node, parents) = self.get_node_and_parents_by_key(root_node, key)?; - Ok(MerkleKeyValueStream{_merkle: self, node, _parents: parents}) + Ok(MerkleKeyValueStream { + _merkle: self, + node, + _parents: parents, + }) } } @@ -1198,7 +1202,10 @@ pub struct MerkleKeyValueStream<'a, S> { impl<'a, S> Stream for MerkleKeyValueStream<'a, S> { type Item = (Vec, Vec); - fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { let node = match &self.node { None => return std::task::Poll::Ready(None), Some(node) => node, @@ -1206,7 +1213,7 @@ impl<'a, S> Stream for MerkleKeyValueStream<'a, S> { let ret = node.inner().as_leaf().unwrap(); // TODO: advance to next leaf // TODO: construct full path at this point, maybe save it - std::task::Poll::Ready(Some((ret.0.to_vec(), ret.1.to_vec()))) + std::task::Poll::Ready(Some((ret.0.to_vec(), ret.1.to_vec()))) } } From 9bb6cd308fdfb5a78114829e3b380df8bb38a541 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Mon, 6 Nov 2023 20:13:24 +0000 Subject: [PATCH 03/17] WIP --- firewood/Cargo.toml | 1 + firewood/src/db.rs | 4 +- firewood/src/merkle.rs | 113 ++++++++++++++++++++++++++++++++++------- firewood/src/v2/api.rs | 6 +-- 4 files changed, 101 insertions(+), 23 deletions(-) diff --git a/firewood/Cargo.toml b/firewood/Cargo.toml index dcbdb0f8c..5581f816d 100644 --- a/firewood/Cargo.toml +++ b/firewood/Cargo.toml @@ -46,6 +46,7 @@ serial_test = "2.0.0" clap = { version = "4.3.1", features = ['derive'] } test-case = "3.1.0" pprof = { version = "0.13.0", features = ["flamegraph"] } +itertools = "0.11.0" [[bench]] name = "hashops" diff --git a/firewood/src/db.rs b/firewood/src/db.rs index 83b8f4a68..71f3388bf 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -319,10 +319,10 @@ impl + Send + Sync> DbRev { pub fn stream( &self, start_key: Option, - ) -> Result, api::Error> { + ) -> Result, api::Error> { // TODO: get first key when start_key is None self.merkle - .get_iter(start_key.unwrap(), self.header.kv_root) + .get_iter(start_key, self.header.kv_root) .map_err(|e| api::Error::InternalError(e.into())) } diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index 47baf3ac1..3a6cda6b4 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -2,9 +2,11 @@ // See the file LICENSE.md for licensing terms. use crate::shale::{self, disk_address::DiskAddress, ObjWriteError, ShaleError, ShaleStore}; +use crate::v2::api; use crate::{nibbles::Nibbles, v2::api::Proof}; use futures::Stream; use sha3::Digest; +use std::pin::pin; use std::{ cmp::Ordering, collections::HashMap, @@ -875,7 +877,7 @@ impl + Send + Sync> Merkle { let (found, parents, deleted) = { let (node_ref, mut parents) = - self.get_node_and_parents_by_key(self.get_node(root)?, key)?; + self.get_node_and_parents_by_key(self.get_node(root)?, &key)?; let Some(mut node_ref) = node_ref else { return Ok(None); @@ -964,7 +966,7 @@ impl + Send + Sync> Merkle { fn get_node_and_parents_by_key<'a, K: AsRef<[u8]>>( &'a self, node_ref: ObjRef<'a>, - key: K, + key: &K, ) -> Result<(Option>, ParentRefs<'a>), MerkleError> { let mut parents = Vec::new(); let node_ref = self.get_node_by_key_with_callback(node_ref, key, |node_ref, nib| { @@ -1179,41 +1181,87 @@ impl + Send + Sync> Merkle { pub fn get_iter>( &self, - key: K, + key: Option, root: DiskAddress, - ) -> Result, MerkleError> { + ) -> Result, MerkleError> { // TODO: if DiskAddress::is_null() ... - let root_node = self.get_node(root)?; - let (node, parents) = self.get_node_and_parents_by_key(root_node, key)?; Ok(MerkleKeyValueStream { - _merkle: self, - node, - _parents: parents, + skip: key, + root, + merkle: self, + node: Default::default(), + parents: Default::default(), + current_key: Default::default(), }) } } -pub struct MerkleKeyValueStream<'a, S> { - _merkle: &'a Merkle, +pub struct MerkleKeyValueStream<'a, K, S> { + skip: Option, + root: DiskAddress, + merkle: &'a Merkle, node: Option>, - _parents: Vec<(ObjRef<'a>, u8)>, + parents: Vec<(ObjRef<'a>, u8)>, + current_key: Option>, } -impl<'a, S> Stream for MerkleKeyValueStream<'a, S> { - type Item = (Vec, Vec); +impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> Stream + for MerkleKeyValueStream<'a, K, S> +{ + type Item = Result<(Vec, Vec), api::Error>; fn poll_next( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let node = match &self.node { + let mut pinned_self = pin!(self); + if let Some(key) = pinned_self.skip.take() { + // skip to this key, which must exist + // TODO: support finding the next key after K + let root_node = pinned_self + .merkle + .get_node(pinned_self.root) + .map_err(|e| api::Error::InternalError(e.into()))?; + + (pinned_self.node, pinned_self.parents) = pinned_self + .merkle + .get_node_and_parents_by_key(root_node, &key) + .map_err(|e| api::Error::InternalError(e.into()))?; + pinned_self.current_key = Some(key.as_ref().to_vec()); + return match pinned_self.node.as_ref().unwrap().inner() { + NodeType::Branch(branch) => std::task::Poll::Ready(Some(Ok(( + key.as_ref().to_vec(), + branch.value.to_owned().unwrap().to_vec(), + )))), + + NodeType::Leaf(leaf) => { + std::task::Poll::Ready(Some(Ok((key.as_ref().to_vec(), leaf.1.to_vec())))) + } + NodeType::Extension(_) => todo!(), + }; + } + let node = match &pinned_self.node { None => return std::task::Poll::Ready(None), Some(node) => node, }; + // we previously rendered the value from a branch node, so walk down the children, if any + if let NodeType::Branch(branch) = node.inner() { + if let Some(child) = branch.children[0] { + pinned_self.parents.push((*node, 0)); + } + } let ret = node.inner().as_leaf().unwrap(); - // TODO: advance to next leaf - // TODO: construct full path at this point, maybe save it - std::task::Poll::Ready(Some((ret.0.to_vec(), ret.1.to_vec()))) + let value = ret.1.to_vec(); + + match pinned_self.get_mut().parents.pop() { + None => return std::task::Poll::Ready(None), + Some(objref) => match objref.0.inner() { + NodeType::Branch(_) => todo!(), + NodeType::Leaf(_) => todo!(), + NodeType::Extension(_) => todo!(), + }, + } + std::task::Poll::Ready(Some(Ok((current_key, value)))) } } @@ -1310,10 +1358,12 @@ pub fn from_nibbles(nibbles: &[u8]) -> impl Iterator + '_ { #[cfg(test)] mod tests { use super::*; + use futures::StreamExt; use node::tests::{extension, leaf}; use shale::{cached::DynamicMem, compact::CompactSpace, CachedStore}; use std::sync::Arc; use test_case::test_case; + //use itertools::Itertools; #[test_case(vec![0x12, 0x34, 0x56], vec![0x1, 0x2, 0x3, 0x4, 0x5, 0x6])] #[test_case(vec![0xc0, 0xff], vec![0xc, 0x0, 0xf, 0xf])] @@ -1428,6 +1478,33 @@ mod tests { } } + #[tokio::test] + async fn iterate_empty() { + let merkle = create_test_merkle(); + let root = merkle.init_root().unwrap(); + let mut it = merkle.get_iter(Some(b"x"), root).unwrap(); + let next = it.next().await; + assert!(next.is_none()) + } + + #[tokio::test] + async fn iterate_many() { + let mut merkle = create_test_merkle(); + let root = merkle.init_root().unwrap(); + + for k in u8::MIN..=u8::MAX { + merkle.insert([k], vec![k], root).unwrap(); + } + + let mut it = merkle.get_iter(Some([u8::MIN]), root).unwrap(); + for k in u8::MIN..=u8::MAX { + let next = it.next().await.unwrap().unwrap(); + assert_eq!(next.1, vec![k]); + assert_eq!(next.0, next.1); + } + assert!(it.next().await.is_none()) + } + #[test] fn remove_one() { let key = b"hello"; diff --git a/firewood/src/v2/api.rs b/firewood/src/v2/api.rs index 0ddacbe71..e0f83b278 100644 --- a/firewood/src/v2/api.rs +++ b/firewood/src/v2/api.rs @@ -49,10 +49,10 @@ pub fn vec_into_batch(value: Vec<(K, V)>) -> Batch Date: Wed, 8 Nov 2023 00:05:13 +0000 Subject: [PATCH 04/17] Complete implementation There are several TODOs here, but the code works for the two test cases I created. Both of these avoid making extension nodes, which are going away, and this code may need some tweaks to support values inside branch nodes. --- firewood/Cargo.toml | 1 + firewood/src/merkle.rs | 153 +++++++++++++++++++++++++++++++++-------- 2 files changed, 124 insertions(+), 30 deletions(-) diff --git a/firewood/Cargo.toml b/firewood/Cargo.toml index 5581f816d..b07e57c61 100644 --- a/firewood/Cargo.toml +++ b/firewood/Cargo.toml @@ -34,6 +34,7 @@ thiserror = "1.0.38" tokio = { version = "1.21.1", features = ["rt", "sync", "macros", "rt-multi-thread"] } typed-builder = "0.18.0" bincode = "1.3.3" +itertools = "0.11.0" [dev-dependencies] criterion = {version = "0.5.1", features = ["async_tokio"]} diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index 3a6cda6b4..2478ac226 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -5,6 +5,7 @@ use crate::shale::{self, disk_address::DiskAddress, ObjWriteError, ShaleError, S use crate::v2::api; use crate::{nibbles::Nibbles, v2::api::Proof}; use futures::Stream; +use itertools::Itertools; use sha3::Digest; use std::pin::pin; use std::{ @@ -12,6 +13,7 @@ use std::{ collections::HashMap, io::Write, iter::once, + task::Poll, sync::{atomic::Ordering::Relaxed, OnceLock}, }; use thiserror::Error; @@ -1186,10 +1188,10 @@ impl + Send + Sync> Merkle { ) -> Result, MerkleError> { // TODO: if DiskAddress::is_null() ... Ok(MerkleKeyValueStream { - skip: key, - root, + starting_key: key, + merkle_root: root, merkle: self, - node: Default::default(), + current_node: Default::default(), parents: Default::default(), current_key: Default::default(), }) @@ -1197,11 +1199,16 @@ impl + Send + Sync> Merkle { } pub struct MerkleKeyValueStream<'a, K, S> { - skip: Option, - root: DiskAddress, + starting_key: Option, + merkle_root: DiskAddress, merkle: &'a Merkle, - node: Option>, + // current node is the node that was last returned from poll_next + current_node: Option>, + // parents hold pointers up the tree to the parents parents: Vec<(ObjRef<'a>, u8)>, + // this is the last key returned + // TODO: improve this; we can probably just adjust current_key as we walk up + // and down the merkle tree rather than fully rebuilding it each time current_key: Option>, } @@ -1213,55 +1220,141 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> fn poll_next( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { let mut pinned_self = pin!(self); - if let Some(key) = pinned_self.skip.take() { + if let Some(key) = pinned_self.starting_key.take() { // skip to this key, which must exist // TODO: support finding the next key after K let root_node = pinned_self .merkle - .get_node(pinned_self.root) + .get_node(pinned_self.merkle_root) .map_err(|e| api::Error::InternalError(e.into()))?; - (pinned_self.node, pinned_self.parents) = pinned_self + (pinned_self.current_node, pinned_self.parents) = pinned_self .merkle .get_node_and_parents_by_key(root_node, &key) .map_err(|e| api::Error::InternalError(e.into()))?; + if pinned_self.current_node.as_ref().is_none() { + return Poll::Ready(None); + } pinned_self.current_key = Some(key.as_ref().to_vec()); - return match pinned_self.node.as_ref().unwrap().inner() { - NodeType::Branch(branch) => std::task::Poll::Ready(Some(Ok(( + return match pinned_self.current_node.as_ref().unwrap().inner() { + NodeType::Branch(branch) => Poll::Ready(Some(Ok(( key.as_ref().to_vec(), branch.value.to_owned().unwrap().to_vec(), )))), NodeType::Leaf(leaf) => { - std::task::Poll::Ready(Some(Ok((key.as_ref().to_vec(), leaf.1.to_vec())))) + Poll::Ready(Some(Ok((key.as_ref().to_vec(), leaf.1.to_vec())))) } NodeType::Extension(_) => todo!(), }; } - let node = match &pinned_self.node { - None => return std::task::Poll::Ready(None), + // The current node might be none if the tree is empty or we happen to be at the end + let current_node = match pinned_self.current_node.take() { + None => return Poll::Ready(None), Some(node) => node, }; - // we previously rendered the value from a branch node, so walk down the children, if any - if let NodeType::Branch(branch) = node.inner() { - if let Some(child) = branch.children[0] { - pinned_self.parents.push((*node, 0)); + + let next_node = match current_node.inner() { + NodeType::Branch(branch) => { + // previously rendered the value from a branch node, so walk down to the first child + if let Some(child) = branch.children[0] { + pinned_self.parents.push((current_node, 0)); + Some( + pinned_self + .merkle + .get_node(child) + .map_err(|e| api::Error::InternalError(e.into()))?, + ) + } else { + // Branch node with no first child? Should have been a leaf node? + return Poll::Ready(Some(Err(api::Error::InternalError(Box::new( + MerkleError::ParentLeafBranch, + ))))); + } } - } - let ret = node.inner().as_leaf().unwrap(); - let value = ret.1.to_vec(); - - match pinned_self.get_mut().parents.pop() { - None => return std::task::Poll::Ready(None), - Some(objref) => match objref.0.inner() { - NodeType::Branch(_) => todo!(), - NodeType::Leaf(_) => todo!(), + NodeType::Leaf(_leaf) => { + let mut next = pinned_self.parents.pop(); + loop { + match next { + None => return Poll::Ready(None), + Some((parent, child_position)) => { + // This code assumes all parents are branch nodes and will panic otherwise + if child_position as usize == NBRANCH - 1 { + // don't index past end of list + next = pinned_self.parents.pop(); + continue; + } + match parent.inner().as_branch().unwrap().chd()[child_position.wrapping_add(1) as usize] { + Some(addr) => { + // there is a child at the next address, so walk down it + // We use u8::MAX to walk down the leftmost value, as it + // gets increased immediately once popped + let child = pinned_self + .merkle + .get_node(addr) + .map(|node| (node, u8::MAX)) + .map_err(|e| api::Error::InternalError(e.into()))?; + let keep_going = child.0.inner().is_branch(); + next = Some(child); + pinned_self.parents.push((parent, child_position.wrapping_add(1))); + if !keep_going { + break; + } + } + None => { + next = pinned_self.parents.pop(); + continue; + } + } + } + } + } + // recompute current_key + // TODO: Can we keep current_key updated as we walk the tree instead of building it from the top all the time? + let current_key = pinned_self + .parents + .iter() + .skip(1) + .map(|parent| parent.1) + .tuples() + .map(|(hi, lo)| { + (hi << 4) + lo + }) + .collect::>(); + pinned_self.current_key = current_key.into(); // Some(current_node.inner().as_leaf().unwrap().0.to_vec()); + pinned_self + .current_key + .as_mut() + .unwrap() + .extend(current_node.inner().as_leaf().unwrap().0.to_vec()); + next.map(|node| node.0) + } + + NodeType::Extension(_) => todo!(), + }; + pinned_self.current_node = next_node; + + match &pinned_self.current_node { + None => Poll::Ready(None), + Some(objref) => match objref.inner() { + NodeType::Branch(branch) => { + Poll::Ready(Some(Ok(( + pinned_self.current_key.as_ref().unwrap().to_vec(), + branch.value.to_owned().unwrap().to_vec(), + )))) + } + + NodeType::Leaf(leaf) => { + Poll::Ready(Some(Ok(( + pinned_self.current_key.as_ref().unwrap().to_vec(), + leaf.1.to_vec(), + )))) + } NodeType::Extension(_) => todo!(), }, } - std::task::Poll::Ready(Some(Ok((current_key, value)))) } } @@ -1499,8 +1592,8 @@ mod tests { let mut it = merkle.get_iter(Some([u8::MIN]), root).unwrap(); for k in u8::MIN..=u8::MAX { let next = it.next().await.unwrap().unwrap(); - assert_eq!(next.1, vec![k]); assert_eq!(next.0, next.1); + assert_eq!(next.1, vec![k]); } assert!(it.next().await.is_none()) } From 116d83b8ed9c99f8f4d7647f9da02c94cb995d35 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Wed, 8 Nov 2023 00:12:15 +0000 Subject: [PATCH 05/17] Formatting --- firewood/src/merkle.rs | 34 ++++++++++++++++------------------ firewood/src/v2/api.rs | 4 ++-- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index 2478ac226..a93d74504 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -13,8 +13,8 @@ use std::{ collections::HashMap, io::Write, iter::once, - task::Poll, sync::{atomic::Ordering::Relaxed, OnceLock}, + task::Poll, }; use thiserror::Error; @@ -1286,7 +1286,9 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> next = pinned_self.parents.pop(); continue; } - match parent.inner().as_branch().unwrap().chd()[child_position.wrapping_add(1) as usize] { + match parent.inner().as_branch().unwrap().chd() + [child_position.wrapping_add(1) as usize] + { Some(addr) => { // there is a child at the next address, so walk down it // We use u8::MAX to walk down the leftmost value, as it @@ -1298,7 +1300,9 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> .map_err(|e| api::Error::InternalError(e.into()))?; let keep_going = child.0.inner().is_branch(); next = Some(child); - pinned_self.parents.push((parent, child_position.wrapping_add(1))); + pinned_self + .parents + .push((parent, child_position.wrapping_add(1))); if !keep_going { break; } @@ -1319,9 +1323,7 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> .skip(1) .map(|parent| parent.1) .tuples() - .map(|(hi, lo)| { - (hi << 4) + lo - }) + .map(|(hi, lo)| (hi << 4) + lo) .collect::>(); pinned_self.current_key = current_key.into(); // Some(current_node.inner().as_leaf().unwrap().0.to_vec()); pinned_self @@ -1339,19 +1341,15 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> match &pinned_self.current_node { None => Poll::Ready(None), Some(objref) => match objref.inner() { - NodeType::Branch(branch) => { - Poll::Ready(Some(Ok(( - pinned_self.current_key.as_ref().unwrap().to_vec(), - branch.value.to_owned().unwrap().to_vec(), - )))) - } + NodeType::Branch(branch) => Poll::Ready(Some(Ok(( + pinned_self.current_key.as_ref().unwrap().to_vec(), + branch.value.to_owned().unwrap().to_vec(), + )))), - NodeType::Leaf(leaf) => { - Poll::Ready(Some(Ok(( - pinned_self.current_key.as_ref().unwrap().to_vec(), - leaf.1.to_vec(), - )))) - } + NodeType::Leaf(leaf) => Poll::Ready(Some(Ok(( + pinned_self.current_key.as_ref().unwrap().to_vec(), + leaf.1.to_vec(), + )))), NodeType::Extension(_) => todo!(), }, } diff --git a/firewood/src/v2/api.rs b/firewood/src/v2/api.rs index e0f83b278..c9d4c7492 100644 --- a/firewood/src/v2/api.rs +++ b/firewood/src/v2/api.rs @@ -49,8 +49,8 @@ pub fn vec_into_batch(value: Vec<(K, V)>) -> Batch Date: Wed, 8 Nov 2023 00:13:18 +0000 Subject: [PATCH 06/17] itertools is only needed at runtime --- firewood/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/firewood/Cargo.toml b/firewood/Cargo.toml index b07e57c61..fa4b713d0 100644 --- a/firewood/Cargo.toml +++ b/firewood/Cargo.toml @@ -47,7 +47,6 @@ serial_test = "2.0.0" clap = { version = "4.3.1", features = ['derive'] } test-case = "3.1.0" pprof = { version = "0.13.0", features = ["flamegraph"] } -itertools = "0.11.0" [[bench]] name = "hashops" From f5e69c6feebdf5c58e5f5f4f17c0531bf3c4d7f1 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Wed, 8 Nov 2023 18:23:25 +0000 Subject: [PATCH 07/17] No need to borrow K for get_node Per review comments --- firewood/src/merkle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index a93d74504..1ddadd6d5 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -968,7 +968,7 @@ impl + Send + Sync> Merkle { fn get_node_and_parents_by_key<'a, K: AsRef<[u8]>>( &'a self, node_ref: ObjRef<'a>, - key: &K, + key: K, ) -> Result<(Option>, ParentRefs<'a>), MerkleError> { let mut parents = Vec::new(); let node_ref = self.get_node_by_key_with_callback(node_ref, key, |node_ref, nib| { From e6e36d06722d9c6d378e7e2b505cdbbaf0dd50dc Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Wed, 8 Nov 2023 18:26:32 +0000 Subject: [PATCH 08/17] Use let-else-return instead of match arms Per review comments --- firewood/src/merkle.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index 1ddadd6d5..d0a067cab 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -1251,9 +1251,8 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> }; } // The current node might be none if the tree is empty or we happen to be at the end - let current_node = match pinned_self.current_node.take() { - None => return Poll::Ready(None), - Some(node) => node, + let Some(current_node) = pinned_self.current_node.take() else { + return Poll::Ready(None); }; let next_node = match current_node.inner() { From e25d42b9ff34691eb23f6ca74258f55ee0d82745 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Wed, 8 Nov 2023 18:28:08 +0000 Subject: [PATCH 09/17] Remove unnecessary continue Per review comments --- firewood/src/merkle.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index d0a067cab..f4f6c72e4 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -1306,10 +1306,7 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> break; } } - None => { - next = pinned_self.parents.pop(); - continue; - } + None => next = pinned_self.parents.pop(), } } } From 82734341d550a44a4554caf5c4dcafd1307c14b1 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Wed, 8 Nov 2023 18:29:42 +0000 Subject: [PATCH 10/17] Rename current_node to previously_returned_node Per review comments --- firewood/src/merkle.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index f4f6c72e4..bcd43de15 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -1191,7 +1191,7 @@ impl + Send + Sync> Merkle { starting_key: key, merkle_root: root, merkle: self, - current_node: Default::default(), + previously_returned_node: Default::default(), parents: Default::default(), current_key: Default::default(), }) @@ -1203,7 +1203,7 @@ pub struct MerkleKeyValueStream<'a, K, S> { merkle_root: DiskAddress, merkle: &'a Merkle, // current node is the node that was last returned from poll_next - current_node: Option>, + previously_returned_node: Option>, // parents hold pointers up the tree to the parents parents: Vec<(ObjRef<'a>, u8)>, // this is the last key returned @@ -1230,15 +1230,20 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> .get_node(pinned_self.merkle_root) .map_err(|e| api::Error::InternalError(e.into()))?; - (pinned_self.current_node, pinned_self.parents) = pinned_self + (pinned_self.previously_returned_node, pinned_self.parents) = pinned_self .merkle .get_node_and_parents_by_key(root_node, &key) .map_err(|e| api::Error::InternalError(e.into()))?; - if pinned_self.current_node.as_ref().is_none() { + if pinned_self.previously_returned_node.as_ref().is_none() { return Poll::Ready(None); } pinned_self.current_key = Some(key.as_ref().to_vec()); - return match pinned_self.current_node.as_ref().unwrap().inner() { + return match pinned_self + .previously_returned_node + .as_ref() + .unwrap() + .inner() + { NodeType::Branch(branch) => Poll::Ready(Some(Ok(( key.as_ref().to_vec(), branch.value.to_owned().unwrap().to_vec(), @@ -1251,7 +1256,7 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> }; } // The current node might be none if the tree is empty or we happen to be at the end - let Some(current_node) = pinned_self.current_node.take() else { + let Some(current_node) = pinned_self.previously_returned_node.take() else { return Poll::Ready(None); }; @@ -1332,9 +1337,9 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> NodeType::Extension(_) => todo!(), }; - pinned_self.current_node = next_node; + pinned_self.previously_returned_node = next_node; - match &pinned_self.current_node { + match &pinned_self.previously_returned_node { None => Poll::Ready(None), Some(objref) => match objref.inner() { NodeType::Branch(branch) => Poll::Ready(Some(Ok(( From 36ce8cca8af820624aebb1b1617a00e1af5b39dd Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Thu, 9 Nov 2023 16:13:05 +0000 Subject: [PATCH 11/17] Redo the loop to have an IteratorState IteratorState is any of: - Start at the beginning of the trie - Start at a specific key value of the trie - Continue after a saved node and parents See the inline comments for more details. TODO: - Implement StartAtBegininng (and tests) - If a key is provided that doesn't exist, start iterating at the next one (if we really need this) - If an error is returned, we should next return None, then start from the beginning. This makes the resumption after an error a little cleaner. We'd need another IteratorState for this. - Remove extension nodes (depends on other diffs) --- firewood/Cargo.toml | 1 - firewood/src/db.rs | 2 +- firewood/src/merkle.rs | 278 ++++++++++++++++++++++------------------- 3 files changed, 147 insertions(+), 134 deletions(-) diff --git a/firewood/Cargo.toml b/firewood/Cargo.toml index fa4b713d0..dcbdb0f8c 100644 --- a/firewood/Cargo.toml +++ b/firewood/Cargo.toml @@ -34,7 +34,6 @@ thiserror = "1.0.38" tokio = { version = "1.21.1", features = ["rt", "sync", "macros", "rt-multi-thread"] } typed-builder = "0.18.0" bincode = "1.3.3" -itertools = "0.11.0" [dev-dependencies] criterion = {version = "0.5.1", features = ["async_tokio"]} diff --git a/firewood/src/db.rs b/firewood/src/db.rs index 71f3388bf..09a0285eb 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -319,7 +319,7 @@ impl + Send + Sync> DbRev { pub fn stream( &self, start_key: Option, - ) -> Result, api::Error> { + ) -> Result, api::Error> { // TODO: get first key when start_key is None self.merkle .get_iter(start_key, self.header.kv_root) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index bcd43de15..4fa5c5a31 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -5,7 +5,6 @@ use crate::shale::{self, disk_address::DiskAddress, ObjWriteError, ShaleError, S use crate::v2::api; use crate::{nibbles::Nibbles, v2::api::Proof}; use futures::Stream; -use itertools::Itertools; use sha3::Digest; use std::pin::pin; use std::{ @@ -1185,36 +1184,50 @@ impl + Send + Sync> Merkle { &self, key: Option, root: DiskAddress, - ) -> Result, MerkleError> { + ) -> Result, MerkleError> { // TODO: if DiskAddress::is_null() ... Ok(MerkleKeyValueStream { - starting_key: key, + key_state: IteratorState::new(key), merkle_root: root, merkle: self, - previously_returned_node: Default::default(), - parents: Default::default(), - current_key: Default::default(), }) } } -pub struct MerkleKeyValueStream<'a, K, S> { - starting_key: Option, +enum IteratorState<'a> { + /// Start iterating at the beginning of the trie, + /// returning the lowest key/value pair first + StartAtBeginning, + /// Start iterating at the specified key + StartAtKey(Vec), + /// Continue iterating after the given last_node and parents + Iterating { + last_node: ObjRef<'a>, + parents: Vec<(ObjRef<'a>, u8)>, + }, +} +impl IteratorState<'_> { + fn new>(starting: Option) -> Self { + match starting { + None => Self::StartAtBeginning, + Some(key) => Self::StartAtKey(key.as_ref().to_vec()), + } + } +} + +// The default state is to start at the beginning +impl<'a> Default for IteratorState<'a> { + fn default() -> Self { + Self::StartAtBeginning + } +} +pub struct MerkleKeyValueStream<'a, S> { + key_state: IteratorState<'a>, merkle_root: DiskAddress, merkle: &'a Merkle, - // current node is the node that was last returned from poll_next - previously_returned_node: Option>, - // parents hold pointers up the tree to the parents - parents: Vec<(ObjRef<'a>, u8)>, - // this is the last key returned - // TODO: improve this; we can probably just adjust current_key as we walk up - // and down the merkle tree rather than fully rebuilding it each time - current_key: Option>, } -impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> Stream - for MerkleKeyValueStream<'a, K, S> -{ +impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyValueStream<'a, S> { type Item = Result<(Vec, Vec), api::Error>; fn poll_next( @@ -1222,81 +1235,88 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> _cx: &mut std::task::Context<'_>, ) -> Poll> { let mut pinned_self = pin!(self); - if let Some(key) = pinned_self.starting_key.take() { - // skip to this key, which must exist - // TODO: support finding the next key after K - let root_node = pinned_self - .merkle - .get_node(pinned_self.merkle_root) - .map_err(|e| api::Error::InternalError(e.into()))?; - - (pinned_self.previously_returned_node, pinned_self.parents) = pinned_self - .merkle - .get_node_and_parents_by_key(root_node, &key) - .map_err(|e| api::Error::InternalError(e.into()))?; - if pinned_self.previously_returned_node.as_ref().is_none() { - return Poll::Ready(None); - } - pinned_self.current_key = Some(key.as_ref().to_vec()); - return match pinned_self - .previously_returned_node - .as_ref() - .unwrap() - .inner() - { - NodeType::Branch(branch) => Poll::Ready(Some(Ok(( - key.as_ref().to_vec(), - branch.value.to_owned().unwrap().to_vec(), - )))), - - NodeType::Leaf(leaf) => { - Poll::Ready(Some(Ok((key.as_ref().to_vec(), leaf.1.to_vec())))) - } - NodeType::Extension(_) => todo!(), - }; - } - // The current node might be none if the tree is empty or we happen to be at the end - let Some(current_node) = pinned_self.previously_returned_node.take() else { - return Poll::Ready(None); - }; - - let next_node = match current_node.inner() { - NodeType::Branch(branch) => { - // previously rendered the value from a branch node, so walk down to the first child - if let Some(child) = branch.children[0] { - pinned_self.parents.push((current_node, 0)); - Some( - pinned_self - .merkle - .get_node(child) - .map_err(|e| api::Error::InternalError(e.into()))?, - ) - } else { - // Branch node with no first child? Should have been a leaf node? - return Poll::Ready(Some(Err(api::Error::InternalError(Box::new( - MerkleError::ParentLeafBranch, - ))))); - } + // Note that this sets the key_state to StartAtBeginning temporarily + // - if you get to the end you get Ok(None) but can fetch again from the start + // - if you get an error, you'll get Err(...), but continuing to fetch starts from the top + // If this isn't what you want, then consider using [std::iter::fuse] + let found_key = match std::mem::take(&mut pinned_self.key_state) { + IteratorState::StartAtBeginning => todo!(), + IteratorState::StartAtKey(key) => { + // TODO: support finding the next key after K + let root_node = pinned_self + .merkle + .get_node(pinned_self.merkle_root) + .map_err(|e| api::Error::InternalError(e.into()))?; + + let (found_node, parents) = pinned_self + .merkle + .get_node_and_parents_by_key(root_node, &key) + .map_err(|e| api::Error::InternalError(e.into()))?; + let Some(last_node) = found_node else { + return Poll::Ready(None); + }; + let returned_key_value = match last_node.inner() { + NodeType::Branch(branch) => { + (key.clone(), branch.value.to_owned().unwrap().to_vec()) + } + NodeType::Leaf(leaf) => (key, leaf.1.to_vec()), + NodeType::Extension(_) => todo!(), + }; + pinned_self.key_state = IteratorState::Iterating { last_node, parents }; + return Poll::Ready(Some(Ok(returned_key_value))); } - NodeType::Leaf(_leaf) => { - let mut next = pinned_self.parents.pop(); - loop { - match next { - None => return Poll::Ready(None), - Some((parent, child_position)) => { - // This code assumes all parents are branch nodes and will panic otherwise - if child_position as usize == NBRANCH - 1 { - // don't index past end of list - next = pinned_self.parents.pop(); - continue; - } - match parent.inner().as_branch().unwrap().chd() - [child_position.wrapping_add(1) as usize] - { - Some(addr) => { - // there is a child at the next address, so walk down it - // We use u8::MAX to walk down the leftmost value, as it - // gets increased immediately once popped + IteratorState::Iterating { + last_node, + mut parents, + } => { + match last_node.inner() { + NodeType::Branch(branch) => { + // previously rendered the value from a branch node, so walk down to the first available child + if let Some(child_position) = + branch.children.iter().position(|&addr| addr.is_some()) + { + let child_address = branch.children[child_position].unwrap(); + parents.push((last_node, child_position as u8)); // remember where we walked down from + let current_node = pinned_self + .merkle + .get_node(child_address) + .map_err(|e| api::Error::InternalError(e.into()))?; + let found_key = parents[1..] + .chunks_exact(2) + .map(|parents| (parents[0].1 << 4) + parents[1].1) + .collect::>(); + pinned_self.key_state = IteratorState::Iterating { + // continue iterating from here + last_node: current_node, + parents, + }; + found_key + } else { + // Branch node with no children? + return Poll::Ready(Some(Err(api::Error::InternalError(Box::new( + MerkleError::ParentLeafBranch, + ))))); + } + } + NodeType::Leaf(leaf) => { + let mut next = parents.pop(); + loop { + match next { + None => return Poll::Ready(None), + Some((parent, child_position)) => { + // Assume all parents are branch nodes + let children = parent.inner().as_branch().unwrap().chd(); + let mut child_position = child_position.wrapping_add(1); + if let Some(found_offset) = children[child_position as usize..] + .iter() + .position(|&addr| addr.is_some()) + { + child_position += found_offset as u8; + } else { + next = parents.pop(); + continue; + } + let addr = children[child_position as usize].unwrap(); let child = pinned_self .merkle .get_node(addr) @@ -1304,56 +1324,50 @@ impl<'a, K: AsRef<[u8]> + Unpin, S: shale::ShaleStore + Send + Sync> .map_err(|e| api::Error::InternalError(e.into()))?; let keep_going = child.0.inner().is_branch(); next = Some(child); - pinned_self - .parents - .push((parent, child_position.wrapping_add(1))); + + parents.push((parent, child_position)); if !keep_going { break; } } - None => next = pinned_self.parents.pop(), } } + // recompute current_key + // TODO: Can we keep current_key updated as we walk the tree instead of building it from the top all the time? + let mut current_key = parents[1..] + .chunks_exact(2) + .map(|parents| (parents[0].1 << 4) + parents[1].1) + .collect::>(); + current_key.extend(leaf.0.to_vec()); + pinned_self.key_state = IteratorState::Iterating { + last_node: next.unwrap().0, + parents, + }; + current_key } + + NodeType::Extension(_) => todo!(), } - // recompute current_key - // TODO: Can we keep current_key updated as we walk the tree instead of building it from the top all the time? - let current_key = pinned_self - .parents - .iter() - .skip(1) - .map(|parent| parent.1) - .tuples() - .map(|(hi, lo)| (hi << 4) + lo) - .collect::>(); - pinned_self.current_key = current_key.into(); // Some(current_node.inner().as_leaf().unwrap().0.to_vec()); - pinned_self - .current_key - .as_mut() - .unwrap() - .extend(current_node.inner().as_leaf().unwrap().0.to_vec()); - next.map(|node| node.0) } + }; - NodeType::Extension(_) => todo!(), + // figure out the value to return from the state + // if we get here, we're sure to have something to return + let return_value = match &pinned_self.key_state { + IteratorState::Iterating { + last_node, + parents: _, + } => { + let value = match last_node.inner() { + NodeType::Branch(branch) => branch.value.to_owned().unwrap().to_vec(), + NodeType::Leaf(leaf) => leaf.1.to_vec(), + NodeType::Extension(_) => todo!(), + }; + (found_key, value) + } + _ => unreachable!(), }; - pinned_self.previously_returned_node = next_node; - - match &pinned_self.previously_returned_node { - None => Poll::Ready(None), - Some(objref) => match objref.inner() { - NodeType::Branch(branch) => Poll::Ready(Some(Ok(( - pinned_self.current_key.as_ref().unwrap().to_vec(), - branch.value.to_owned().unwrap().to_vec(), - )))), - - NodeType::Leaf(leaf) => Poll::Ready(Some(Ok(( - pinned_self.current_key.as_ref().unwrap().to_vec(), - leaf.1.to_vec(), - )))), - NodeType::Extension(_) => todo!(), - }, - } + Poll::Ready(Some(Ok(return_value))) } } From 4d5f65c6cf3849d0fed7b8670bf9b765a20e0514 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Thu, 9 Nov 2023 19:19:53 +0000 Subject: [PATCH 12/17] Removed unnecessary reference in remove() This was an artifact from an earlier change. Reverted back to original. --- firewood/src/merkle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index 4fa5c5a31..8ba0a83a4 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -878,7 +878,7 @@ impl + Send + Sync> Merkle { let (found, parents, deleted) = { let (node_ref, mut parents) = - self.get_node_and_parents_by_key(self.get_node(root)?, &key)?; + self.get_node_and_parents_by_key(self.get_node(root)?, key)?; let Some(mut node_ref) = node_ref else { return Ok(None); From 692018a5cfac4884c4c54d5b9aba8dd6033e1fd3 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Thu, 9 Nov 2023 19:59:35 +0000 Subject: [PATCH 13/17] Fix typo Not sure how this got in there. Reverting it. --- firewood/src/v2/api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firewood/src/v2/api.rs b/firewood/src/v2/api.rs index c9d4c7492..0ddacbe71 100644 --- a/firewood/src/v2/api.rs +++ b/firewood/src/v2/api.rs @@ -52,7 +52,7 @@ pub fn vec_into_batch(value: Vec<(K, V)>) -> Batch Date: Thu, 9 Nov 2023 20:10:27 +0000 Subject: [PATCH 14/17] Add some blank lines --- firewood/src/merkle.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index 8ba0a83a4..ffd58974f 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -1252,9 +1252,11 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal .merkle .get_node_and_parents_by_key(root_node, &key) .map_err(|e| api::Error::InternalError(e.into()))?; + let Some(last_node) = found_node else { return Poll::Ready(None); }; + let returned_key_value = match last_node.inner() { NodeType::Branch(branch) => { (key.clone(), branch.value.to_owned().unwrap().to_vec()) @@ -1262,7 +1264,9 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal NodeType::Leaf(leaf) => (key, leaf.1.to_vec()), NodeType::Extension(_) => todo!(), }; + pinned_self.key_state = IteratorState::Iterating { last_node, parents }; + return Poll::Ready(Some(Ok(returned_key_value))); } IteratorState::Iterating { @@ -1276,20 +1280,25 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal branch.children.iter().position(|&addr| addr.is_some()) { let child_address = branch.children[child_position].unwrap(); + parents.push((last_node, child_position as u8)); // remember where we walked down from + let current_node = pinned_self .merkle .get_node(child_address) .map_err(|e| api::Error::InternalError(e.into()))?; + let found_key = parents[1..] .chunks_exact(2) .map(|parents| (parents[0].1 << 4) + parents[1].1) .collect::>(); + pinned_self.key_state = IteratorState::Iterating { // continue iterating from here last_node: current_node, parents, }; + found_key } else { // Branch node with no children? @@ -1306,7 +1315,9 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal Some((parent, child_position)) => { // Assume all parents are branch nodes let children = parent.inner().as_branch().unwrap().chd(); + let mut child_position = child_position.wrapping_add(1); + if let Some(found_offset) = children[child_position as usize..] .iter() .position(|&addr| addr.is_some()) @@ -1316,16 +1327,21 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal next = parents.pop(); continue; } + let addr = children[child_position as usize].unwrap(); + let child = pinned_self .merkle .get_node(addr) .map(|node| (node, u8::MAX)) .map_err(|e| api::Error::InternalError(e.into()))?; + let keep_going = child.0.inner().is_branch(); + next = Some(child); parents.push((parent, child_position)); + if !keep_going { break; } @@ -1338,11 +1354,14 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal .chunks_exact(2) .map(|parents| (parents[0].1 << 4) + parents[1].1) .collect::>(); + current_key.extend(leaf.0.to_vec()); + pinned_self.key_state = IteratorState::Iterating { last_node: next.unwrap().0, parents, }; + current_key } @@ -1363,10 +1382,12 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal NodeType::Leaf(leaf) => leaf.1.to_vec(), NodeType::Extension(_) => todo!(), }; + (found_key, value) } _ => unreachable!(), }; + Poll::Ready(Some(Ok(return_value))) } } From 7ab9003e8c0f25f0578933fece834a608c9d8d40 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Thu, 9 Nov 2023 21:24:29 +0000 Subject: [PATCH 15/17] Remove pin! macro Seems like we can just reference self and it works! --- firewood/src/merkle.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index ffd58974f..c9c035f16 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -6,7 +6,6 @@ use crate::v2::api; use crate::{nibbles::Nibbles, v2::api::Proof}; use futures::Stream; use sha3::Digest; -use std::pin::pin; use std::{ cmp::Ordering, collections::HashMap, @@ -1231,24 +1230,23 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal type Item = Result<(Vec, Vec), api::Error>; fn poll_next( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> Poll> { - let mut pinned_self = pin!(self); // Note that this sets the key_state to StartAtBeginning temporarily // - if you get to the end you get Ok(None) but can fetch again from the start // - if you get an error, you'll get Err(...), but continuing to fetch starts from the top // If this isn't what you want, then consider using [std::iter::fuse] - let found_key = match std::mem::take(&mut pinned_self.key_state) { + let found_key = match std::mem::take(&mut self.key_state) { IteratorState::StartAtBeginning => todo!(), IteratorState::StartAtKey(key) => { // TODO: support finding the next key after K - let root_node = pinned_self + let root_node = self .merkle - .get_node(pinned_self.merkle_root) + .get_node(self.merkle_root) .map_err(|e| api::Error::InternalError(e.into()))?; - let (found_node, parents) = pinned_self + let (found_node, parents) = self .merkle .get_node_and_parents_by_key(root_node, &key) .map_err(|e| api::Error::InternalError(e.into()))?; @@ -1265,7 +1263,7 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal NodeType::Extension(_) => todo!(), }; - pinned_self.key_state = IteratorState::Iterating { last_node, parents }; + self.key_state = IteratorState::Iterating { last_node, parents }; return Poll::Ready(Some(Ok(returned_key_value))); } @@ -1283,7 +1281,7 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal parents.push((last_node, child_position as u8)); // remember where we walked down from - let current_node = pinned_self + let current_node = self .merkle .get_node(child_address) .map_err(|e| api::Error::InternalError(e.into()))?; @@ -1293,7 +1291,7 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal .map(|parents| (parents[0].1 << 4) + parents[1].1) .collect::>(); - pinned_self.key_state = IteratorState::Iterating { + self.key_state = IteratorState::Iterating { // continue iterating from here last_node: current_node, parents, @@ -1330,7 +1328,7 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal let addr = children[child_position as usize].unwrap(); - let child = pinned_self + let child = self .merkle .get_node(addr) .map(|node| (node, u8::MAX)) @@ -1357,7 +1355,7 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal current_key.extend(leaf.0.to_vec()); - pinned_self.key_state = IteratorState::Iterating { + self.key_state = IteratorState::Iterating { last_node: next.unwrap().0, parents, }; @@ -1372,7 +1370,7 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal // figure out the value to return from the state // if we get here, we're sure to have something to return - let return_value = match &pinned_self.key_state { + let return_value = match &self.key_state { IteratorState::Iterating { last_node, parents: _, From 4867f822766c1ebb8e95a05061481882efd5e30f Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Thu, 9 Nov 2023 23:16:32 +0000 Subject: [PATCH 16/17] In-person review comments Removed unnecessary clone Added additional comments Added TODO for a few things: - possible to return a reference to the value instead of a copy - should handle case of branch nodes with values while traversing Adding a randomized key-value test after extension nodes are removed is also recommended. --- firewood/src/merkle.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index c9c035f16..ad92d6925 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -1257,7 +1257,7 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal let returned_key_value = match last_node.inner() { NodeType::Branch(branch) => { - (key.clone(), branch.value.to_owned().unwrap().to_vec()) + (key, branch.value.to_owned().unwrap().to_vec()) } NodeType::Leaf(leaf) => (key, leaf.1.to_vec()), NodeType::Extension(_) => todo!(), @@ -1314,6 +1314,8 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal // Assume all parents are branch nodes let children = parent.inner().as_branch().unwrap().chd(); + // we use wrapping_add here because the value might be u8::MAX indicating that + // we want to go down branch let mut child_position = child_position.wrapping_add(1); if let Some(found_offset) = children[child_position as usize..] @@ -1328,19 +1330,21 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal let addr = children[child_position as usize].unwrap(); + // we push (node, u8::MAX) because we will add 1, which will wrap to 0 let child = self .merkle .get_node(addr) .map(|node| (node, u8::MAX)) .map_err(|e| api::Error::InternalError(e.into()))?; - let keep_going = child.0.inner().is_branch(); + // TODO: If the branch has a value, then we shouldn't keep_going + let keep_going_down = child.0.inner().is_branch(); next = Some(child); parents.push((parent, child_position)); - if !keep_going { + if !keep_going_down { break; } } @@ -1370,6 +1374,8 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal // figure out the value to return from the state // if we get here, we're sure to have something to return + // TODO: It's possible to return a reference to the data since the last_node is + // saved in the iterator let return_value = match &self.key_state { IteratorState::Iterating { last_node, From afe63571594e89641ec240ccc586f06012c571f1 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Fri, 10 Nov 2023 16:05:43 +0000 Subject: [PATCH 17/17] Format --- firewood/src/merkle.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/firewood/src/merkle.rs b/firewood/src/merkle.rs index ad92d6925..829978c64 100644 --- a/firewood/src/merkle.rs +++ b/firewood/src/merkle.rs @@ -1256,9 +1256,7 @@ impl<'a, S: shale::ShaleStore + Send + Sync> Stream for MerkleKeyVal }; let returned_key_value = match last_node.inner() { - NodeType::Branch(branch) => { - (key, branch.value.to_owned().unwrap().to_vec()) - } + NodeType::Branch(branch) => (key, branch.value.to_owned().unwrap().to_vec()), NodeType::Leaf(leaf) => (key, leaf.1.to_vec()), NodeType::Extension(_) => todo!(), };