Skip to content

Commit

Permalink
Restore Log on Error & Spawn Blocking in Streamer (#5585)
Browse files Browse the repository at this point in the history
* Restore Logging in Error Cases

* Use Spawn Blocking for Loading Blocks in Streamer

* Merge remote-tracking branch 'upstream/unstable' into request_logging_spawn_blocking

* Address Sean's Comments

* save a clone
  • Loading branch information
ethDreamer authored Apr 16, 2024
1 parent e5b8d12 commit f689898
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 106 deletions.
89 changes: 56 additions & 33 deletions beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1};
use slog::{crit, debug, Logger};
use slog::{crit, debug, error, Logger};
use std::collections::HashMap;
use std::sync::Arc;
use store::{DatabaseBlock, ExecutionPayloadDeneb};
use task_executor::TaskExecutor;
use tokio::sync::{
mpsc::{self, UnboundedSender},
RwLock,
Expand Down Expand Up @@ -395,18 +394,18 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
pub fn new(
beacon_chain: &Arc<BeaconChain<T>>,
check_caches: CheckCaches,
) -> Result<Self, BeaconChainError> {
) -> Result<Arc<Self>, BeaconChainError> {
let execution_layer = beacon_chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)?
.clone();

Ok(Self {
Ok(Arc::new(Self {
execution_layer,
check_caches,
beacon_chain: beacon_chain.clone(),
})
}))
}

fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
Expand All @@ -425,30 +424,44 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}
}

fn load_payloads(&self, block_roots: Vec<Hash256>) -> Vec<(Hash256, LoadResult<T::EthSpec>)> {
let mut db_blocks = Vec::new();

for root in block_roots {
if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) {
db_blocks.push((root, Ok(Some(cached_block))));
continue;
}

match self.beacon_chain.store.try_get_full_block(&root) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Ok(opt_block.map(|db_block| match db_block {
DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)),
DatabaseBlock::Blinded(block) => {
LoadedBeaconBlock::Blinded(Box::new(block))
async fn load_payloads(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
) -> Result<Vec<(Hash256, LoadResult<T::EthSpec>)>, BeaconChainError> {
let streamer = self.clone();
// Loading from the DB is slow -> spawn a blocking task
self.beacon_chain
.spawn_blocking_handle(
move || {
let mut db_blocks = Vec::new();
for root in block_roots {
if let Some(cached_block) =
streamer.check_caches(root).map(LoadedBeaconBlock::Full)
{
db_blocks.push((root, Ok(Some(cached_block))));
continue;
}
})),
)),
}
}

db_blocks
match streamer.beacon_chain.store.try_get_full_block(&root) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Ok(opt_block.map(|db_block| match db_block {
DatabaseBlock::Full(block) => {
LoadedBeaconBlock::Full(Arc::new(block))
}
DatabaseBlock::Blinded(block) => {
LoadedBeaconBlock::Blinded(Box::new(block))
}
})),
)),
}
}
db_blocks
},
"load_beacon_blocks",
)
.await
}

/// Pre-process the loaded blocks into execution engine requests.
Expand Down Expand Up @@ -549,7 +562,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {

// used when the execution engine doesn't support the payload bodies methods
async fn stream_blocks_fallback(
&self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
Expand All @@ -575,7 +588,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}

async fn stream_blocks(
&self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
Expand All @@ -584,7 +597,17 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
let mut n_sent = 0usize;
let mut engine_requests = 0usize;

let payloads = self.load_payloads(block_roots);
let payloads = match self.load_payloads(block_roots).await {
Ok(payloads) => payloads,
Err(e) => {
error!(
self.beacon_chain.log,
"BeaconBlockStreamer: Failed to load payloads";
"error" => ?e
);
return;
}
};
let requests = self.get_requests(payloads).await;

for (root, request) in requests {
Expand Down Expand Up @@ -624,7 +647,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}

pub async fn stream(
self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
Expand All @@ -650,16 +673,16 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}

pub fn launch_stream(
self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> impl Stream<Item = (Hash256, Arc<BlockResult<T::EthSpec>>)> {
let (block_tx, block_rx) = mpsc::unbounded_channel();
debug!(
self.beacon_chain.log,
"Launching a BeaconBlockStreamer";
"blocks" => block_roots.len(),
);
let executor = self.beacon_chain.task_executor.clone();
executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender");
UnboundedReceiverStream::new(block_rx)
}
Expand Down
8 changes: 2 additions & 6 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn get_blocks_checking_caches(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> Result<
impl Stream<
Item = (
Expand All @@ -1149,14 +1148,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Error,
> {
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?
.launch_stream(block_roots, executor))
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?.launch_stream(block_roots))
}

pub fn get_blocks(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> Result<
impl Stream<
Item = (
Expand All @@ -1166,8 +1163,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Error,
> {
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?
.launch_stream(block_roots, executor))
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?.launch_stream(block_roots))
}

pub fn get_blobs_checking_early_attester_cache(
Expand Down
6 changes: 2 additions & 4 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
let executor = processor.executor.clone();
processor
.handle_blocks_by_range_request(executor, peer_id, request_id, request)
.handle_blocks_by_range_request(peer_id, request_id, request)
.await;
};

Expand All @@ -530,9 +529,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
let executor = processor.executor.clone();
processor
.handle_blocks_by_root_request(executor, peer_id, request_id, request)
.handle_blocks_by_root_request(peer_id, request_id, request)
.await;
};

Expand Down
Loading

0 comments on commit f689898

Please sign in to comment.