diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index bbd5bfcac9a..4f413ce2a86 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -1,10 +1,9 @@ use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes}; use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; -use slog::{crit, debug, Logger}; +use slog::{crit, debug, error, Logger}; use std::collections::HashMap; use std::sync::Arc; use store::{DatabaseBlock, ExecutionPayloadDeneb}; -use task_executor::TaskExecutor; use tokio::sync::{ mpsc::{self, UnboundedSender}, RwLock, @@ -395,18 +394,18 @@ impl BeaconBlockStreamer { pub fn new( beacon_chain: &Arc>, check_caches: CheckCaches, - ) -> Result { + ) -> Result, BeaconChainError> { let execution_layer = beacon_chain .execution_layer .as_ref() .ok_or(BeaconChainError::ExecutionLayerMissing)? .clone(); - Ok(Self { + Ok(Arc::new(Self { execution_layer, check_caches, beacon_chain: beacon_chain.clone(), - }) + })) } fn check_caches(&self, root: Hash256) -> Option>> { @@ -425,30 +424,44 @@ impl BeaconBlockStreamer { } } - fn load_payloads(&self, block_roots: Vec) -> Vec<(Hash256, LoadResult)> { - let mut db_blocks = Vec::new(); - - for root in block_roots { - if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) { - db_blocks.push((root, Ok(Some(cached_block)))); - continue; - } - - match self.beacon_chain.store.try_get_full_block(&root) { - Err(e) => db_blocks.push((root, Err(e.into()))), - Ok(opt_block) => db_blocks.push(( - root, - Ok(opt_block.map(|db_block| match db_block { - DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)), - DatabaseBlock::Blinded(block) => { - LoadedBeaconBlock::Blinded(Box::new(block)) + async fn load_payloads( + self: &Arc, + block_roots: Vec, + ) -> Result)>, BeaconChainError> { + let streamer = self.clone(); + // Loading from the DB is slow -> spawn a blocking task + self.beacon_chain + .spawn_blocking_handle( + move || { + let mut db_blocks = Vec::new(); + for root in block_roots { + if let Some(cached_block) = + streamer.check_caches(root).map(LoadedBeaconBlock::Full) + { + db_blocks.push((root, Ok(Some(cached_block)))); + continue; } - })), - )), - } - } - db_blocks + match streamer.beacon_chain.store.try_get_full_block(&root) { + Err(e) => db_blocks.push((root, Err(e.into()))), + Ok(opt_block) => db_blocks.push(( + root, + Ok(opt_block.map(|db_block| match db_block { + DatabaseBlock::Full(block) => { + LoadedBeaconBlock::Full(Arc::new(block)) + } + DatabaseBlock::Blinded(block) => { + LoadedBeaconBlock::Blinded(Box::new(block)) + } + })), + )), + } + } + db_blocks + }, + "load_beacon_blocks", + ) + .await } /// Pre-process the loaded blocks into execution engine requests. @@ -549,7 +562,7 @@ impl BeaconBlockStreamer { // used when the execution engine doesn't support the payload bodies methods async fn stream_blocks_fallback( - &self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -575,7 +588,7 @@ impl BeaconBlockStreamer { } async fn stream_blocks( - &self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -584,7 +597,17 @@ impl BeaconBlockStreamer { let mut n_sent = 0usize; let mut engine_requests = 0usize; - let payloads = self.load_payloads(block_roots); + let payloads = match self.load_payloads(block_roots).await { + Ok(payloads) => payloads, + Err(e) => { + error!( + self.beacon_chain.log, + "BeaconBlockStreamer: Failed to load payloads"; + "error" => ?e + ); + return; + } + }; let requests = self.get_requests(payloads).await; for (root, request) in requests { @@ -624,7 +647,7 @@ impl BeaconBlockStreamer { } pub async fn stream( - self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -650,9 +673,8 @@ impl BeaconBlockStreamer { } pub fn launch_stream( - self, + self: Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> impl Stream>)> { let (block_tx, block_rx) = mpsc::unbounded_channel(); debug!( @@ -660,6 +682,7 @@ impl BeaconBlockStreamer { "Launching a BeaconBlockStreamer"; "blocks" => block_roots.len(), ); + let executor = self.beacon_chain.task_executor.clone(); executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender"); UnboundedReceiverStream::new(block_rx) } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7c497e74584..b3790024f81 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1139,7 +1139,6 @@ impl BeaconChain { pub fn get_blocks_checking_caches( self: &Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> Result< impl Stream< Item = ( @@ -1149,14 +1148,12 @@ impl BeaconChain { >, Error, > { - Ok(BeaconBlockStreamer::::new(self, CheckCaches::Yes)? - .launch_stream(block_roots, executor)) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::Yes)?.launch_stream(block_roots)) } pub fn get_blocks( self: &Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> Result< impl Stream< Item = ( @@ -1166,8 +1163,7 @@ impl BeaconChain { >, Error, > { - Ok(BeaconBlockStreamer::::new(self, CheckCaches::No)? - .launch_stream(block_roots, executor)) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::No)?.launch_stream(block_roots)) } pub fn get_blobs_checking_early_attester_cache( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 27b9e676da6..f10646c7414 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -509,9 +509,8 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { - let executor = processor.executor.clone(); processor - .handle_blocks_by_range_request(executor, peer_id, request_id, request) + .handle_blocks_by_range_request(peer_id, request_id, request) .await; }; @@ -530,9 +529,8 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { - let executor = processor.executor.clone(); processor - .handle_blocks_by_root_request(executor, peer_id, request_id, request) + .handle_blocks_by_root_request(peer_id, request_id, request) .await; }; diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index fb813723920..1e72dc42578 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -11,7 +11,6 @@ use slog::{debug, error, warn}; use slot_clock::SlotClock; use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; -use task_executor::TaskExecutor; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; use types::{Epoch, EthSpec, ForkName, Hash256, Slot}; @@ -129,7 +128,6 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRoot` request from the peer. pub async fn handle_blocks_by_root_request( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, @@ -138,7 +136,7 @@ impl NetworkBeaconProcessor { peer_id, request_id, self.clone() - .handle_blocks_by_root_request_inner(executor, peer_id, request_id, request) + .handle_blocks_by_root_request_inner(peer_id, request_id, request) .await, Response::BlocksByRoot, ); @@ -147,15 +145,24 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRoot` request from the peer. pub async fn handle_blocks_by_root_request_inner( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, ) -> Result<(), (RPCResponseErrorCode, &'static str)> { + let log_results = |peer_id, requested_blocks, send_block_count| { + debug!( + self.log, + "BlocksByRoot outgoing response processed"; + "peer" => %peer_id, + "requested" => requested_blocks, + "returned" => %send_block_count + ); + }; + let requested_blocks = request.block_roots().len(); let mut block_stream = match self .chain - .get_blocks_checking_caches(request.block_roots().to_vec(), &executor) + .get_blocks_checking_caches(request.block_roots().to_vec()) { Ok(block_stream) => block_stream, Err(e) => { @@ -193,6 +200,7 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); + log_results(peer_id, requested_blocks, send_block_count); return Err(( RPCResponseErrorCode::ResourceUnavailable, "Execution layer not synced", @@ -209,13 +217,7 @@ impl NetworkBeaconProcessor { } } } - debug!( - self.log, - "Received BlocksByRoot Request"; - "peer" => %peer_id, - "requested" => requested_blocks, - "returned" => %send_block_count - ); + log_results(peer_id, requested_blocks, send_block_count); Ok(()) } @@ -302,7 +304,7 @@ impl NetworkBeaconProcessor { } debug!( self.log, - "Received BlobsByRoot Request"; + "BlobsByRoot outgoing response processed"; "peer" => %peer_id, "request_root" => %requested_root, "request_indices" => ?requested_indices, @@ -395,7 +397,6 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRange` request from the peer. pub async fn handle_blocks_by_range_request( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, req: BlocksByRangeRequest, @@ -404,7 +405,7 @@ impl NetworkBeaconProcessor { peer_id, request_id, self.clone() - .handle_blocks_by_range_request_inner(executor, peer_id, request_id, req) + .handle_blocks_by_range_request_inner(peer_id, request_id, req) .await, Response::BlocksByRange, ); @@ -413,7 +414,6 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRange` request from the peer. pub async fn handle_blocks_by_range_request_inner( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, req: BlocksByRangeRequest, @@ -506,7 +506,37 @@ impl NetworkBeaconProcessor { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); - let mut block_stream = match self.chain.get_blocks(block_roots, &executor) { + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { + if blocks_sent < (*req.count() as usize) { + debug!( + self.log, + "BlocksByRange outgoing response processed"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot(), + "current_slot" => current_slot, + "requested" => req.count(), + "returned" => blocks_sent + ); + } else { + debug!( + self.log, + "BlocksByRange outgoing response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot(), + "current_slot" => current_slot, + "requested" => req.count(), + "returned" => blocks_sent + ); + } + }; + + let mut block_stream = match self.chain.get_blocks(block_roots) { Ok(block_stream) => block_stream, Err(e) => { error!(self.log, "Error getting block stream"; "error" => ?e); @@ -516,7 +546,6 @@ impl NetworkBeaconProcessor { // Fetching blocks is async because it may have to hit the execution layer for payloads. let mut blocks_sent = 0; - while let Some((root, result)) = block_stream.next().await { match result.as_ref() { Ok(Some(block)) => { @@ -541,6 +570,7 @@ impl NetworkBeaconProcessor { "peer" => %peer_id, "request_root" => ?root ); + log_results(req, peer_id, blocks_sent); return Err((RPCResponseErrorCode::ServerError, "Database inconsistency")); } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { @@ -550,6 +580,7 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); + log_results(req, peer_id, blocks_sent); // send the stream terminator return Err(( RPCResponseErrorCode::ResourceUnavailable, @@ -577,41 +608,14 @@ impl NetworkBeaconProcessor { "error" => ?e ); } - + log_results(req, peer_id, blocks_sent); // send the stream terminator return Err((RPCResponseErrorCode::ServerError, "Failed fetching blocks")); } } } - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - - if blocks_sent < (*req.count() as usize) { - debug!( - self.log, - "BlocksByRange outgoing response processed"; - "peer" => %peer_id, - "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot(), - "current_slot" => current_slot, - "requested" => req.count(), - "returned" => blocks_sent - ); - } else { - debug!( - self.log, - "BlocksByRange outgoing response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot(), - "current_slot" => current_slot, - "requested" => req.count(), - "returned" => blocks_sent - ); - } - + log_results(req, peer_id, blocks_sent); Ok(()) } @@ -754,9 +758,25 @@ impl NetworkBeaconProcessor { } }; + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |peer_id, req: BlobsByRangeRequest, blobs_sent| { + debug!( + self.log, + "BlobsByRange outgoing response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent + ); + }; + // remove all skip slots let block_roots = block_roots.into_iter().flatten(); - let mut blobs_sent = 0; for root in block_roots { @@ -780,6 +800,8 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "error" => ?e ); + log_results(peer_id, req, blobs_sent); + return Err(( RPCResponseErrorCode::ServerError, "No blobs and failed fetching corresponding block", @@ -787,21 +809,7 @@ impl NetworkBeaconProcessor { } } } - - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - - debug!( - self.log, - "BlobsByRange Response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent - ); + log_results(peer_id, req, blobs_sent); Ok(()) }