Skip to content

Commit

Permalink
Snap Sync: Account Range
Browse files Browse the repository at this point in the history
  • Loading branch information
fmoletta committed Nov 27, 2024
1 parent 64835f3 commit d42c670
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 75 deletions.
7 changes: 4 additions & 3 deletions crates/networking/p2p/kademlia.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{
discv4::{time_now_unix, FindNodeRequest}, peer_channels::PeerChannels, types::Node,
discv4::{time_now_unix, FindNodeRequest},
peer_channels::PeerChannels,
types::Node,
};
use ethrex_core::{H256, H512, U256};
use sha3::{Digest, Keccak256};
Expand Down Expand Up @@ -280,14 +282,13 @@ impl KademliaTable {
pub async fn get_peer_channels(&self) -> PeerChannels {
loop {
if let Some(channels) = self.get_peer().and_then(|peer| peer.channels) {
return channels
return channels;
}
info!("[Sync] No peers available, retrying in 10 sec");
// This is the unlikely case where we just started the node and don't have peers, wait a bit and try again
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
}

}

/// Computes the distance between two nodes according to the discv4 protocol
Expand Down
96 changes: 82 additions & 14 deletions crates/networking/p2p/peer_channels.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,44 @@
use std::{sync::Arc, time::Duration};

use ethrex_core::{
types::{BlockBody, BlockHeader},
types::{AccountState, BlockBody, BlockHeader},
H256,
};
use tokio::sync::{Mutex, mpsc};
use ethrex_rlp::encode::RLPEncode;
use ethrex_trie::verify_range;
use tokio::sync::{mpsc, Mutex};

use crate::{
rlpx::eth::blocks::{
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT,
}, RLPxMessage
rlpx::{
eth::blocks::{
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT,
},
snap::{AccountRange, GetAccountRange},
},
snap::encodable_to_proof,
RLPxMessage,
};

pub const PEER_REPLY_TIMOUT: Duration = Duration::from_secs(45);
pub const MAX_MESSAGES_IN_PEER_CHANNEL: usize = 25;
pub const MAX_RESPONSE_BYTES: u64 = 500; // TODO: Set
pub const HASH_MAX: H256 = H256([0xFF; 32]);

#[derive(Debug, Clone)]
/// Holds the respective sender and receiver ends of the communication channels bewteen the peer data and its active connection
pub struct PeerChannels {
sender: mpsc::Sender<RLPxMessage>,
pub receiver: Arc<Mutex<mpsc::Receiver<RLPxMessage>>>,
receiver: Arc<Mutex<mpsc::Receiver<RLPxMessage>>>,
}


impl PeerChannels {
/// Sets up the communication channels for the peer
/// Returns the channel endpoints to send to the active connection's listen loop
pub fn create() -> (Self, mpsc::Sender<RLPxMessage>, mpsc::Receiver<RLPxMessage>) {
let (sender, connection_receiver) = mpsc::channel::<RLPxMessage>(MAX_MESSAGES_IN_PEER_CHANNEL);
let (connection_sender, receiver) = mpsc::channel::<RLPxMessage>(MAX_MESSAGES_IN_PEER_CHANNEL);
pub(crate) fn create() -> (Self, mpsc::Sender<RLPxMessage>, mpsc::Receiver<RLPxMessage>) {
let (sender, connection_receiver) =
mpsc::channel::<RLPxMessage>(MAX_MESSAGES_IN_PEER_CHANNEL);
let (connection_sender, receiver) =
mpsc::channel::<RLPxMessage>(MAX_MESSAGES_IN_PEER_CHANNEL);
(
Self {
sender,
Expand All @@ -39,8 +49,8 @@ impl PeerChannels {
)
}

/// Requests block headers from the peer
/// Returns the response message or None if:
/// Requests block headers from the peer, starting from the `start` block hash towards newer blocks
/// Returns the block headers or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - The response timed out
/// - The response was empty or not valid
Expand Down Expand Up @@ -74,8 +84,8 @@ impl PeerChannels {
(!block_headers.is_empty()).then_some(block_headers)
}

/// Requests block headers from the peer
/// Returns the response message or None if:
/// Requests block headers from the peer given their block hashes
/// Returns the block headers or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - The response timed out
/// - The response was empty or not valid
Expand Down Expand Up @@ -105,4 +115,62 @@ impl PeerChannels {
.ok()??;
(!block_bodies.is_empty()).then_some(block_bodies)
}

/// Requests an account range from the peer given the state trie's root and the starting hash (the limit hash will be the maximum value of H256)
/// Will also return a boolean indicating if there is more state to be fetched towards the right of the trie
/// Returns the response message or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - The response timed out
/// - The response was empty or not valid
pub async fn request_account_range(
&self,
state_root: H256,
start: H256,
) -> Option<(Vec<H256>, Vec<AccountState>, bool)> {
let request_id = rand::random();
let request = RLPxMessage::GetAccountRange(GetAccountRange {
id: request_id,
root_hash: state_root,
starting_hash: start,
limit_hash: HASH_MAX,
response_bytes: MAX_RESPONSE_BYTES,
});
self.sender.send(request).await.ok()?;
let mut receiver = self.receiver.lock().await;
let (accounts, proof) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::AccountRange(AccountRange {
id,
accounts,
proof,
})) if id == request_id => return Some((accounts, proof)),
// Ignore replies that don't match the expected id (such as late responses)
Some(_) => continue,
None => return None,
}
}
})
.await
.ok()??;
// Unzip & validate response
let proof = encodable_to_proof(&proof);
let (account_hashes, accounts): (Vec<_>, Vec<_>) = accounts
.into_iter()
.map(|unit| (unit.hash, AccountState::from(unit.account)))
.unzip();
let encoded_accounts = accounts
.iter()
.map(|acc| acc.encode_to_vec())
.collect::<Vec<_>>();
let should_continue = verify_range(
state_root,
&start,
&account_hashes,
&encoded_accounts,
&proof,
)
.ok()?;
Some((account_hashes, accounts, should_continue))
}
}
9 changes: 6 additions & 3 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;

use crate::{
peer_channels::PeerChannels, rlpx::{
peer_channels::PeerChannels,
rlpx::{
eth::{
backend,
blocks::{BlockBodies, BlockHeaders},
Expand All @@ -11,10 +12,12 @@ use crate::{
message::Message,
p2p::{self, DisconnectMessage, PingMessage, PongMessage},
utils::id2pubkey,
}, snap::{
},
snap::{
process_account_range_request, process_byte_codes_request, process_storage_ranges_request,
process_trie_nodes_request,
}, MAX_DISC_PACKET_SIZE
},
MAX_DISC_PACKET_SIZE,
};

use super::{
Expand Down
6 changes: 3 additions & 3 deletions crates/networking/p2p/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ pub fn validate_account_range_response(

// Helper method to convert proof to RLP-encodable format
#[inline]
fn proof_to_encodable(proof: Vec<Vec<u8>>) -> Vec<Bytes> {
pub(crate) fn proof_to_encodable(proof: Vec<Vec<u8>>) -> Vec<Bytes> {
proof.into_iter().map(|bytes| Bytes::from(bytes)).collect()
}

// Helper method to obtain proof from RLP-encodable format
#[inline]
fn encodable_to_proof(proof: &Vec<Bytes>) -> Vec<Vec<u8>> {
pub(crate) fn encodable_to_proof(proof: &Vec<Bytes>) -> Vec<Vec<u8>> {
proof.into_iter().map(|bytes| bytes.to_vec()).collect()
}

Expand Down Expand Up @@ -1003,7 +1003,7 @@ mod tests {

// Create a store and load it up with the accounts
let store = Store::new("null", EngineType::InMemory).unwrap();
let mut state_trie = store.new_state_trie_for_test();
let mut state_trie = store.new_state_trie();
for (address, account) in accounts {
let hashed_address = H256::from_str(address).unwrap().as_bytes().to_vec();
let account = AccountState::from(AccountStateSlim::decode(&account).unwrap());
Expand Down
97 changes: 47 additions & 50 deletions crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use ethrex_core::{
types::{validate_block_header, BlockHash, BlockHeader, InvalidBlockHeaderError},
H256,
};
use ethrex_rlp::encode::RLPEncode;
use ethrex_storage::Store;
use ethrex_trie::EMPTY_TRIE_HASH;
use tokio::sync::Mutex;
use tracing::info;

Expand Down Expand Up @@ -40,10 +42,7 @@ impl SyncManager {
let peer = self.peers.lock().await.get_peer_channels().await;
info!("[Sync] Requesting Block Headers from {current_head}");
// Request Block Headers from Peer
if let Some(block_headers) = peer
.request_block_headers(current_head)
.await
{
if let Some(block_headers) = peer.request_block_headers(current_head).await {
// We received the correct message, we can now:
// - Validate the batch of headers received and start downloading their state (Future Iteration)
// - Check if we need to download another batch (aka we don't have the sync_head yet)
Expand Down Expand Up @@ -150,9 +149,7 @@ async fn fetch_blocks_and_receipts(
loop {
let peer = peers.lock().await.get_peer_channels().await;
info!("[Sync] Requesting Block Headers ");
if let Some(block_bodies) = peer.request_block_bodies(block_hashes.clone())
.await
{
if let Some(block_bodies) = peer.request_block_bodies(block_hashes.clone()).await {
info!("[SYNC] Received {} Block Bodies", block_bodies.len());
// Track which bodies we have already fetched
let (fetched_hashes, remaining_hashes) = block_hashes.split_at(block_bodies.len());
Expand All @@ -179,48 +176,48 @@ async fn fetch_snap_state(
peers: Arc<Mutex<KademliaTable>>,
store: Store,
) {
// for root_hash in state_roots {
// // Fetch Account Ranges
// let mut account_ranges_request = GetAccountRange {
// id: 7,
// root_hash,
// starting_hash: H256::zero(),
// limit_hash: H256::from_str(
// "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
// )
// .unwrap(),
// response_bytes: 500,
// };
// loop {
// // TODO: Randomize id
// account_ranges_request.id += 1;
// info!("[Sync] Sending Block headers request ");
// // Send a GetBlockBodies request to a peer
// if peers
// .lock()
// .await
// .send_message_to_peer(Message::GetAccountRange(account_ranges_request.clone()))
// .await
// .is_err()
// {
// info!("[Sync] No peers available, retrying in 10 sec");
// // This is the unlikely case where we just started the node and don't have peers, wait a bit and try again
// tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
// continue;
// };
// // Wait for the peer to reply
// match tokio::time::timeout(REPLY_TIMEOUT, reply_receiver.recv()).await {
// Ok(Some(Message::AccountRange(message)))
// if message.id == account_ranges_request.id && !message.accounts.is_empty() =>
// {
// info!("[SYNC] Received {} Accounts", message.accounts.len());
// }
for state_root in state_roots {
fetch_snap_state_inner(state_root, peers.clone(), store.clone()).await
}
}

// // Bad peer response, lets try a different peer
// Ok(Some(_)) => info!("[Sync] Bad peer response"),
// // Reply timeouted/peer shut down, lets try a different peer
// _ => info!("[Sync] Peer response timeout( Snap Account Range)"),
// }
// }
// }
/// Rebuilds a Block's account state by requesting state from peers
async fn fetch_snap_state_inner(state_root: H256, peers: Arc<Mutex<KademliaTable>>, store: Store) {
let mut start_account_hash = H256::zero();
// Start from an empty state trie
// We cannot keep an open trie here so we will track the root between lookups
let mut current_state_root = *EMPTY_TRIE_HASH;
// Fetch Account Ranges
loop {
let peer = peers.lock().await.get_peer_channels().await;
info!("[Sync] Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}");
if let Some((account_hashes, accounts, should_continue)) = peer
.request_account_range(state_root, start_account_hash)
.await
{
// Update starting hash for next batch
if should_continue {
start_account_hash = *account_hashes.last().unwrap();
}

// Update trie
let mut trie = store.open_state_trie(current_state_root);
for (account_hash, account) in account_hashes.iter().zip(accounts.iter()) {
// TODO: Handle
trie.insert(account_hash.0.to_vec(), account.encode_to_vec())
.unwrap();
}
// TODO: Handle
current_state_root = trie.hash().unwrap();

if !should_continue {
// All accounts fetched!
break;
}
}
}
if current_state_root != state_root {
info!("[Sync] State sync failed for hash {state_root}");
}
info!("[Sync] Completed state sync for hash {state_root}");
}
8 changes: 6 additions & 2 deletions crates/storage/store/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,10 +885,14 @@ impl Store {
self.engine.get_payload(payload_id)
}

/// Creates a new state trie with an empty state root, for testing purposes only
pub fn new_state_trie_for_test(&self) -> Trie {
/// Creates a new clean state trie (with an empty root)
pub fn new_state_trie(&self) -> Trie {
self.engine.open_state_trie(*EMPTY_TRIE_HASH)
}

pub fn open_state_trie(&self, state_root: H256) -> Trie {
self.engine.open_state_trie(state_root)
}
}

pub fn hash_address(address: &Address) -> Vec<u8> {
Expand Down

0 comments on commit d42c670

Please sign in to comment.