From abcb558503112627d1406602a94b6d85f47d3679 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 4 Sep 2024 23:50:28 +0200 Subject: [PATCH] Move sync active requests to own modules --- .../src/sync/network_context/requests.rs | 159 +----------------- .../network_context/requests/blobs_by_root.rs | 96 +++++++++++ .../requests/blocks_by_root.rs | 60 +++++++ 3 files changed, 161 insertions(+), 154 deletions(-) create mode 100644 beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs create mode 100644 beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 94eecff42d3..0c2f59d143f 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,18 +1,14 @@ -use beacon_chain::get_block_root; -use lighthouse_network::{ - rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}, - PeerId, -}; -use std::sync::Arc; use strum::IntoStaticStr; -use types::{ - blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, -}; +use types::Hash256; +pub use blobs_by_root::{ActiveBlobsByRootRequest, BlobsByRootSingleBlockRequest}; +pub use blocks_by_root::{ActiveBlocksByRootRequest, BlocksByRootSingleRequest}; pub use data_columns_by_root::{ ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest, }; +mod blobs_by_root; +mod blocks_by_root; mod data_columns_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -25,148 +21,3 @@ pub enum LookupVerifyError { InvalidInclusionProof, DuplicateData, } - -pub struct ActiveBlocksByRootRequest { - request: BlocksByRootSingleRequest, - resolved: bool, - pub(crate) peer_id: PeerId, -} - -impl ActiveBlocksByRootRequest { - pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self { - Self { - request, - resolved: false, - peer_id, - } - } - - /// Append a response to the single chunk request. If the chunk is valid, the request is - /// resolved immediately. - /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - block: Arc>, - ) -> Result>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - - let block_root = get_block_root(&block); - if self.request.0 != block_root { - return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); - } - - // Valid data, blocks by root expects a single response - self.resolved = true; - Ok(block) - } - - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NoResponseReturned) - } - } -} - -#[derive(Debug, Copy, Clone)] -pub struct BlocksByRootSingleRequest(pub Hash256); - -impl BlocksByRootSingleRequest { - pub fn into_request(self, spec: &ChainSpec) -> BlocksByRootRequest { - BlocksByRootRequest::new(vec![self.0], spec) - } -} - -#[derive(Debug, Clone)] -pub struct BlobsByRootSingleBlockRequest { - pub block_root: Hash256, - pub indices: Vec, -} - -impl BlobsByRootSingleBlockRequest { - pub fn into_request(self, spec: &ChainSpec) -> BlobsByRootRequest { - BlobsByRootRequest::new( - self.indices - .into_iter() - .map(|index| BlobIdentifier { - block_root: self.block_root, - index, - }) - .collect(), - spec, - ) - } -} - -pub struct ActiveBlobsByRootRequest { - request: BlobsByRootSingleBlockRequest, - blobs: Vec>>, - resolved: bool, - pub(crate) peer_id: PeerId, -} - -impl ActiveBlobsByRootRequest { - pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self { - Self { - request, - blobs: vec![], - resolved: false, - peer_id, - } - } - - /// Appends a chunk to this multi-item request. If all expected chunks are received, this - /// method returns `Some`, resolving the request before the stream terminator. - /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - blob: Arc>, - ) -> Result>>>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - - let block_root = blob.block_root(); - if self.request.block_root != block_root { - return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); - } - if !blob.verify_blob_sidecar_inclusion_proof() { - return Err(LookupVerifyError::InvalidInclusionProof); - } - if !self.request.indices.contains(&blob.index) { - return Err(LookupVerifyError::UnrequestedIndex(blob.index)); - } - if self.blobs.iter().any(|b| b.index == blob.index) { - return Err(LookupVerifyError::DuplicateData); - } - - self.blobs.push(blob); - if self.blobs.len() >= self.request.indices.len() { - // All expected chunks received, return result early - self.resolved = true; - Ok(Some(std::mem::take(&mut self.blobs))) - } else { - Ok(None) - } - } - - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NotEnoughResponsesReturned { - expected: self.request.indices.len(), - actual: self.blobs.len(), - }) - } - } - - /// Mark request as resolved (= has returned something downstream) while marking this status as - /// true for future calls. - pub fn resolve(&mut self) -> bool { - std::mem::replace(&mut self.resolved, true) - } -} diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs new file mode 100644 index 00000000000..cb2b1a42ec4 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs @@ -0,0 +1,96 @@ +use lighthouse_network::{rpc::methods::BlobsByRootRequest, PeerId}; +use std::sync::Arc; +use types::{blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256}; + +use super::LookupVerifyError; + +#[derive(Debug, Clone)] +pub struct BlobsByRootSingleBlockRequest { + pub block_root: Hash256, + pub indices: Vec, +} + +impl BlobsByRootSingleBlockRequest { + pub fn into_request(self, spec: &ChainSpec) -> BlobsByRootRequest { + BlobsByRootRequest::new( + self.indices + .into_iter() + .map(|index| BlobIdentifier { + block_root: self.block_root, + index, + }) + .collect(), + spec, + ) + } +} + +pub struct ActiveBlobsByRootRequest { + request: BlobsByRootSingleBlockRequest, + blobs: Vec>>, + resolved: bool, + pub(crate) peer_id: PeerId, +} + +impl ActiveBlobsByRootRequest { + pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self { + Self { + request, + blobs: vec![], + resolved: false, + peer_id, + } + } + + /// Appends a chunk to this multi-item request. If all expected chunks are received, this + /// method returns `Some`, resolving the request before the stream terminator. + /// The active request SHOULD be dropped after `add_response` returns an error + pub fn add_response( + &mut self, + blob: Arc>, + ) -> Result>>>, LookupVerifyError> { + if self.resolved { + return Err(LookupVerifyError::TooManyResponses); + } + + let block_root = blob.block_root(); + if self.request.block_root != block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + } + if !blob.verify_blob_sidecar_inclusion_proof() { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if !self.request.indices.contains(&blob.index) { + return Err(LookupVerifyError::UnrequestedIndex(blob.index)); + } + if self.blobs.iter().any(|b| b.index == blob.index) { + return Err(LookupVerifyError::DuplicateData); + } + + self.blobs.push(blob); + if self.blobs.len() >= self.request.indices.len() { + // All expected chunks received, return result early + self.resolved = true; + Ok(Some(std::mem::take(&mut self.blobs))) + } else { + Ok(None) + } + } + + pub fn terminate(self) -> Result<(), LookupVerifyError> { + if self.resolved { + Ok(()) + } else { + Err(LookupVerifyError::NotEnoughResponsesReturned { + expected: self.request.indices.len(), + actual: self.blobs.len(), + }) + } + } + + /// Mark request as resolved (= has returned something downstream) while marking this status as + /// true for future calls. + pub fn resolve(&mut self) -> bool { + std::mem::replace(&mut self.resolved, true) + } +} diff --git a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs new file mode 100644 index 00000000000..a15d4e39353 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs @@ -0,0 +1,60 @@ +use beacon_chain::get_block_root; +use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; +use std::sync::Arc; +use types::{ChainSpec, EthSpec, Hash256, SignedBeaconBlock}; + +use super::LookupVerifyError; + +#[derive(Debug, Copy, Clone)] +pub struct BlocksByRootSingleRequest(pub Hash256); + +impl BlocksByRootSingleRequest { + pub fn into_request(self, spec: &ChainSpec) -> BlocksByRootRequest { + BlocksByRootRequest::new(vec![self.0], spec) + } +} + +pub struct ActiveBlocksByRootRequest { + request: BlocksByRootSingleRequest, + resolved: bool, + pub(crate) peer_id: PeerId, +} + +impl ActiveBlocksByRootRequest { + pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self { + Self { + request, + resolved: false, + peer_id, + } + } + + /// Append a response to the single chunk request. If the chunk is valid, the request is + /// resolved immediately. + /// The active request SHOULD be dropped after `add_response` returns an error + pub fn add_response( + &mut self, + block: Arc>, + ) -> Result>, LookupVerifyError> { + if self.resolved { + return Err(LookupVerifyError::TooManyResponses); + } + + let block_root = get_block_root(&block); + if self.request.0 != block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + } + + // Valid data, blocks by root expects a single response + self.resolved = true; + Ok(block) + } + + pub fn terminate(self) -> Result<(), LookupVerifyError> { + if self.resolved { + Ok(()) + } else { + Err(LookupVerifyError::NoResponseReturned) + } + } +}