diff --git a/account_manager/src/validator/create.rs b/account_manager/src/validator/create.rs index 8da32531a80..93b041c61c4 100644 --- a/account_manager/src/validator/create.rs +++ b/account_manager/src/validator/create.rs @@ -62,7 +62,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { "The path where the validator keystore passwords will be stored. \ Defaults to ~/.lighthouse/{network}/secrets", ) - .conflicts_with("datadir") .takes_value(true), ) .arg( diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index bbd5bfcac9a..4f413ce2a86 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -1,10 +1,9 @@ use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes}; use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; -use slog::{crit, debug, Logger}; +use slog::{crit, debug, error, Logger}; use std::collections::HashMap; use std::sync::Arc; use store::{DatabaseBlock, ExecutionPayloadDeneb}; -use task_executor::TaskExecutor; use tokio::sync::{ mpsc::{self, UnboundedSender}, RwLock, @@ -395,18 +394,18 @@ impl BeaconBlockStreamer { pub fn new( beacon_chain: &Arc>, check_caches: CheckCaches, - ) -> Result { + ) -> Result, BeaconChainError> { let execution_layer = beacon_chain .execution_layer .as_ref() .ok_or(BeaconChainError::ExecutionLayerMissing)? .clone(); - Ok(Self { + Ok(Arc::new(Self { execution_layer, check_caches, beacon_chain: beacon_chain.clone(), - }) + })) } fn check_caches(&self, root: Hash256) -> Option>> { @@ -425,30 +424,44 @@ impl BeaconBlockStreamer { } } - fn load_payloads(&self, block_roots: Vec) -> Vec<(Hash256, LoadResult)> { - let mut db_blocks = Vec::new(); - - for root in block_roots { - if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) { - db_blocks.push((root, Ok(Some(cached_block)))); - continue; - } - - match self.beacon_chain.store.try_get_full_block(&root) { - Err(e) => db_blocks.push((root, Err(e.into()))), - Ok(opt_block) => db_blocks.push(( - root, - Ok(opt_block.map(|db_block| match db_block { - DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)), - DatabaseBlock::Blinded(block) => { - LoadedBeaconBlock::Blinded(Box::new(block)) + async fn load_payloads( + self: &Arc, + block_roots: Vec, + ) -> Result)>, BeaconChainError> { + let streamer = self.clone(); + // Loading from the DB is slow -> spawn a blocking task + self.beacon_chain + .spawn_blocking_handle( + move || { + let mut db_blocks = Vec::new(); + for root in block_roots { + if let Some(cached_block) = + streamer.check_caches(root).map(LoadedBeaconBlock::Full) + { + db_blocks.push((root, Ok(Some(cached_block)))); + continue; } - })), - )), - } - } - db_blocks + match streamer.beacon_chain.store.try_get_full_block(&root) { + Err(e) => db_blocks.push((root, Err(e.into()))), + Ok(opt_block) => db_blocks.push(( + root, + Ok(opt_block.map(|db_block| match db_block { + DatabaseBlock::Full(block) => { + LoadedBeaconBlock::Full(Arc::new(block)) + } + DatabaseBlock::Blinded(block) => { + LoadedBeaconBlock::Blinded(Box::new(block)) + } + })), + )), + } + } + db_blocks + }, + "load_beacon_blocks", + ) + .await } /// Pre-process the loaded blocks into execution engine requests. @@ -549,7 +562,7 @@ impl BeaconBlockStreamer { // used when the execution engine doesn't support the payload bodies methods async fn stream_blocks_fallback( - &self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -575,7 +588,7 @@ impl BeaconBlockStreamer { } async fn stream_blocks( - &self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -584,7 +597,17 @@ impl BeaconBlockStreamer { let mut n_sent = 0usize; let mut engine_requests = 0usize; - let payloads = self.load_payloads(block_roots); + let payloads = match self.load_payloads(block_roots).await { + Ok(payloads) => payloads, + Err(e) => { + error!( + self.beacon_chain.log, + "BeaconBlockStreamer: Failed to load payloads"; + "error" => ?e + ); + return; + } + }; let requests = self.get_requests(payloads).await; for (root, request) in requests { @@ -624,7 +647,7 @@ impl BeaconBlockStreamer { } pub async fn stream( - self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -650,9 +673,8 @@ impl BeaconBlockStreamer { } pub fn launch_stream( - self, + self: Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> impl Stream>)> { let (block_tx, block_rx) = mpsc::unbounded_channel(); debug!( @@ -660,6 +682,7 @@ impl BeaconBlockStreamer { "Launching a BeaconBlockStreamer"; "blocks" => block_roots.len(), ); + let executor = self.beacon_chain.task_executor.clone(); executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender"); UnboundedReceiverStream::new(block_rx) } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7c497e74584..b3790024f81 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1139,7 +1139,6 @@ impl BeaconChain { pub fn get_blocks_checking_caches( self: &Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> Result< impl Stream< Item = ( @@ -1149,14 +1148,12 @@ impl BeaconChain { >, Error, > { - Ok(BeaconBlockStreamer::::new(self, CheckCaches::Yes)? - .launch_stream(block_roots, executor)) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::Yes)?.launch_stream(block_roots)) } pub fn get_blocks( self: &Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> Result< impl Stream< Item = ( @@ -1166,8 +1163,7 @@ impl BeaconChain { >, Error, > { - Ok(BeaconBlockStreamer::::new(self, CheckCaches::No)? - .launch_stream(block_roots, executor)) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::No)?.launch_stream(block_roots)) } pub fn get_blobs_checking_early_attester_cache( diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 542262487ae..debc4881a60 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -419,21 +419,17 @@ where self } - pub fn execution_layer_from_urls(mut self, urls: &[&str]) -> Self { + pub fn execution_layer_from_url(mut self, url: &str) -> Self { assert!( self.execution_layer.is_none(), "execution layer already defined" ); - let urls: Vec = urls - .iter() - .map(|s| SensitiveUrl::parse(s)) - .collect::>() - .unwrap(); + let url = SensitiveUrl::parse(url).ok(); let config = execution_layer::Config { - execution_endpoints: urls, - secret_files: vec![], + execution_endpoint: url, + secret_file: None, suggested_fee_recipient: Some(Address::repeat_byte(42)), ..Default::default() }; diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 9dd72845180..9b83e9cacbc 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -571,7 +571,7 @@ pub enum BlockingOrAsync { /// queuing specifics. pub enum Work { GossipAttestation { - attestation: GossipAttestationPackage, + attestation: Box>, process_individual: Box) + Send + Sync>, process_batch: Box>) + Send + Sync>, }, @@ -583,7 +583,7 @@ pub enum Work { process_batch: Box>) + Send + Sync>, }, GossipAggregate { - aggregate: GossipAggregatePackage, + aggregate: Box>, process_individual: Box) + Send + Sync>, process_batch: Box>) + Send + Sync>, }, @@ -624,8 +624,8 @@ pub enum Work { ChainSegment(AsyncFn), ChainSegmentBackfill(AsyncFn), Status(BlockingFn), - BlocksByRangeRequest(BlockingFnWithManualSendOnIdle), - BlocksByRootsRequest(BlockingFnWithManualSendOnIdle), + BlocksByRangeRequest(AsyncFn), + BlocksByRootsRequest(AsyncFn), BlobsByRangeRequest(BlockingFn), BlobsByRootsRequest(BlockingFn), GossipBlsToExecutionChange(BlockingFn), @@ -1015,7 +1015,7 @@ impl BeaconProcessor { process_individual: _, process_batch, } => { - aggregates.push(aggregate); + aggregates.push(*aggregate); if process_batch_opt.is_none() { process_batch_opt = Some(process_batch); } @@ -1075,7 +1075,7 @@ impl BeaconProcessor { process_individual: _, process_batch, } => { - attestations.push(attestation); + attestations.push(*attestation); if process_batch_opt.is_none() { process_batch_opt = Some(process_batch); } @@ -1445,7 +1445,7 @@ impl BeaconProcessor { process_individual, process_batch: _, } => task_spawner.spawn_blocking(move || { - process_individual(attestation); + process_individual(*attestation); }), Work::GossipAttestationBatch { attestations, @@ -1458,7 +1458,7 @@ impl BeaconProcessor { process_individual, process_batch: _, } => task_spawner.spawn_blocking(move || { - process_individual(aggregate); + process_individual(*aggregate); }), Work::GossipAggregateBatch { aggregates, @@ -1493,7 +1493,7 @@ impl BeaconProcessor { task_spawner.spawn_blocking(process_fn) } Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { - task_spawner.spawn_blocking_with_manual_send_idle(work) + task_spawner.spawn_async(work) } Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn { @@ -1555,23 +1555,6 @@ impl TaskSpawner { WORKER_TASK_NAME, ) } - - /// Spawn a blocking task, passing the `SendOnDrop` into the task. - /// - /// ## Notes - /// - /// Users must ensure the `SendOnDrop` is dropped at the appropriate time! - pub fn spawn_blocking_with_manual_send_idle(self, task: F) - where - F: FnOnce(SendOnDrop) + Send + 'static, - { - self.executor.spawn_blocking( - || { - task(self.send_idle_on_drop); - }, - WORKER_TASK_NAME, - ) - } } /// This struct will send a message on `self.tx` when it is dropped. An error will be logged on diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 30930318eff..22410976c9d 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -355,14 +355,14 @@ struct Inner { #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct Config { - /// Endpoint urls for EL nodes that are running the engine api. - pub execution_endpoints: Vec, + /// Endpoint url for EL nodes that are running the engine api. + pub execution_endpoint: Option, /// Endpoint urls for services providing the builder api. pub builder_url: Option, /// User agent to send with requests to the builder API. pub builder_user_agent: Option, - /// JWT secrets for the above endpoints running the engine api. - pub secret_files: Vec, + /// JWT secret for the above endpoint running the engine api. + pub secret_file: Option, /// The default fee recipient to use on the beacon node if none if provided from /// the validator client during block preparation. pub suggested_fee_recipient: Option
, @@ -386,10 +386,10 @@ impl ExecutionLayer { /// Instantiate `Self` with an Execution engine specified in `Config`, using JSON-RPC via HTTP. pub fn from_config(config: Config, executor: TaskExecutor, log: Logger) -> Result { let Config { - execution_endpoints: urls, + execution_endpoint: url, builder_url, builder_user_agent, - secret_files, + secret_file, suggested_fee_recipient, jwt_id, jwt_version, @@ -397,16 +397,10 @@ impl ExecutionLayer { execution_timeout_multiplier, } = config; - if urls.len() > 1 { - warn!(log, "Only the first execution engine url will be used"); - } - let execution_url = urls.into_iter().next().ok_or(Error::NoEngine)?; + let execution_url = url.ok_or(Error::NoEngine)?; // Use the default jwt secret path if not provided via cli. - let secret_file = secret_files - .into_iter() - .next() - .unwrap_or_else(|| default_datadir.join(DEFAULT_JWT_FILE)); + let secret_file = secret_file.unwrap_or_else(|| default_datadir.join(DEFAULT_JWT_FILE)); let jwt_key = if secret_file.exists() { // Read secret from file if it already exists diff --git a/beacon_node/execution_layer/src/test_utils/mock_builder.rs b/beacon_node/execution_layer/src/test_utils/mock_builder.rs index b12e26a3d6c..756e0b793f8 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_builder.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_builder.rs @@ -229,8 +229,8 @@ impl MockBuilder { // This EL should not talk to a builder let config = Config { - execution_endpoints: vec![mock_el_url], - secret_files: vec![path], + execution_endpoint: Some(mock_el_url), + secret_file: Some(path), suggested_fee_recipient: None, ..Default::default() }; diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index f76edfa90b7..6717bbc2ab3 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -67,8 +67,8 @@ impl MockExecutionLayer { std::fs::write(&path, hex::encode(DEFAULT_JWT_SECRET)).unwrap(); let config = Config { - execution_endpoints: vec![url], - secret_files: vec![path], + execution_endpoint: Some(url), + secret_file: Some(path), suggested_fee_recipient: Some(Address::repeat_byte(42)), ..Default::default() }; diff --git a/beacon_node/genesis/src/eth1_genesis_service.rs b/beacon_node/genesis/src/eth1_genesis_service.rs index fdba9f4741c..0ede74ba754 100644 --- a/beacon_node/genesis/src/eth1_genesis_service.rs +++ b/beacon_node/genesis/src/eth1_genesis_service.rs @@ -5,7 +5,7 @@ use eth1::{DepositLog, Eth1Block, Service as Eth1Service}; use slog::{debug, error, info, trace, Logger}; use state_processing::{ eth2_genesis_time, initialize_beacon_state_from_eth1, is_valid_genesis_state, - per_block_processing::process_operations::process_deposit, process_activations, + per_block_processing::process_operations::apply_deposit, process_activations, }; use std::sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, @@ -433,7 +433,7 @@ impl Eth1GenesisService { // is reached _prior_ to `MIN_ACTIVE_VALIDATOR_COUNT`. I suspect this won't be the // case for mainnet, so we defer this optimization. - process_deposit(&mut state, &deposit, spec, PROOF_VERIFICATION) + apply_deposit(&mut state, &deposit, spec, PROOF_VERIFICATION) .map_err(|e| format!("Error whilst processing deposit: {:?}", e)) })?; diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index cb537f23590..df5bbba99c8 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -163,7 +163,7 @@ struct InboundInfo { /// Protocol of the original request we received from the peer. protocol: Protocol, /// Responses that the peer is still expecting from us. - remaining_chunks: u64, + max_remaining_chunks: u64, /// Useful to timing how long each request took to process. Currently only used by /// BlocksByRange. request_start_time: Instant, @@ -180,7 +180,7 @@ struct OutboundInfo { /// Info over the protocol this substream is handling. proto: Protocol, /// Number of chunks to be seen from the peer's response. - remaining_chunks: Option, + max_remaining_chunks: Option, /// `Id` as given by the application that sent the request. req_id: Id, } @@ -471,7 +471,7 @@ where // Process one more message if one exists. if let Some(message) = info.pending_items.pop_front() { // If this is the last chunk, terminate the stream. - let last_chunk = info.remaining_chunks <= 1; + let last_chunk = info.max_remaining_chunks <= 1; let fut = send_message_to_inbound_substream(substream, message, last_chunk) .boxed(); @@ -537,7 +537,8 @@ where { // The substream is still active, decrement the remaining // chunks expected. - info.remaining_chunks = info.remaining_chunks.saturating_sub(1); + info.max_remaining_chunks = + info.max_remaining_chunks.saturating_sub(1); // If this substream has not ended, we reset the timer. // Each chunk is allowed RESPONSE_TIMEOUT to be sent. @@ -552,7 +553,7 @@ where // Process one more message if one exists. if let Some(message) = info.pending_items.pop_front() { // If this is the last chunk, terminate the stream. - let last_chunk = info.remaining_chunks <= 1; + let last_chunk = info.max_remaining_chunks <= 1; let fut = send_message_to_inbound_substream( substream, message, last_chunk, ) @@ -664,15 +665,19 @@ where request, } => match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(response))) => { - if request.expected_responses() > 1 && !response.close_after() { + if request.expect_exactly_one_response() || response.close_after() { + // either this is a single response request or this response closes the + // stream + entry.get_mut().state = OutboundSubstreamState::Closing(substream); + } else { let substream_entry = entry.get_mut(); let delay_key = &substream_entry.delay_key; // chunks left after this one - let remaining_chunks = substream_entry - .remaining_chunks + let max_remaining_chunks = substream_entry + .max_remaining_chunks .map(|count| count.saturating_sub(1)) .unwrap_or_else(|| 0); - if remaining_chunks == 0 { + if max_remaining_chunks == 0 { // this is the last expected message, close the stream as all expected chunks have been received substream_entry.state = OutboundSubstreamState::Closing(substream); } else { @@ -682,14 +687,10 @@ where substream, request, }; - substream_entry.remaining_chunks = Some(remaining_chunks); + substream_entry.max_remaining_chunks = Some(max_remaining_chunks); self.outbound_substreams_delay .reset(delay_key, self.resp_timeout); } - } else { - // either this is a single response request or this response closes the - // stream - entry.get_mut().state = OutboundSubstreamState::Closing(substream); } // Check what type of response we got and report it accordingly @@ -725,7 +726,16 @@ where self.outbound_substreams_delay.remove(delay_key); entry.remove_entry(); // notify the application error - if request.expected_responses() > 1 { + if request.expect_exactly_one_response() { + // return an error, stream should not have closed early. + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Err(HandlerErr::Outbound { + id: request_id, + proto: request.versioned_protocol().protocol(), + error: RPCError::IncompleteStream, + }), + )); + } else { // return an end of stream result return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( HandlerEvent::Ok(RPCReceived::EndOfStream( @@ -734,16 +744,6 @@ where )), )); } - - // else we return an error, stream should not have closed early. - let outbound_err = HandlerErr::Outbound { - id: request_id, - proto: request.versioned_protocol().protocol(), - error: RPCError::IncompleteStream, - }; - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::Err(outbound_err), - )); } Poll::Pending => { entry.get_mut().state = @@ -880,10 +880,10 @@ where } let (req, substream) = substream; - let expected_responses = req.expected_responses(); + let max_responses = req.max_responses(); // store requests that expect responses - if expected_responses > 0 { + if max_responses > 0 { if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS { // Store the stream and tag the output. let delay_key = self @@ -894,14 +894,13 @@ where self.current_inbound_substream_id, InboundInfo { state: awaiting_stream, - pending_items: VecDeque::with_capacity(std::cmp::min( - expected_responses, - 128, - ) as usize), + pending_items: VecDeque::with_capacity( + std::cmp::min(max_responses, 128) as usize + ), delay_key: Some(delay_key), protocol: req.versioned_protocol().protocol(), request_start_time: Instant::now(), - remaining_chunks: expected_responses, + max_remaining_chunks: max_responses, }, ); } else { @@ -948,8 +947,14 @@ where } // add the stream to substreams if we expect a response, otherwise drop the stream. - let expected_responses = request.expected_responses(); - if expected_responses > 0 { + let max_responses = request.max_responses(); + if max_responses > 0 { + let max_remaining_chunks = if request.expect_exactly_one_response() { + // Currently enforced only for multiple responses + None + } else { + Some(max_responses) + }; // new outbound request. Store the stream and tag the output. let delay_key = self .outbound_substreams_delay @@ -958,12 +963,6 @@ where substream: Box::new(substream), request, }; - let expected_responses = if expected_responses > 1 { - // Currently enforced only for multiple responses - Some(expected_responses) - } else { - None - }; if self .outbound_substreams .insert( @@ -972,7 +971,7 @@ where state: awaiting_stream, delay_key, proto, - remaining_chunks: expected_responses, + max_remaining_chunks, req_id: id, }, ) diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 67eea09ea77..1b0486ff771 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -483,27 +483,6 @@ impl RPCCodedResponse { RPCCodedResponse::Error(code, err) } - /// Specifies which response allows for multiple chunks for the stream handler. - pub fn multiple_responses(&self) -> bool { - match self { - RPCCodedResponse::Success(resp) => match resp { - RPCResponse::Status(_) => false, - RPCResponse::BlocksByRange(_) => true, - RPCResponse::BlocksByRoot(_) => true, - RPCResponse::BlobsByRange(_) => true, - RPCResponse::BlobsByRoot(_) => true, - RPCResponse::Pong(_) => false, - RPCResponse::MetaData(_) => false, - RPCResponse::LightClientBootstrap(_) => false, - RPCResponse::LightClientOptimisticUpdate(_) => false, - RPCResponse::LightClientFinalityUpdate(_) => false, - }, - RPCCodedResponse::Error(_, _) => true, - // Stream terminations are part of responses that have chunks - RPCCodedResponse::StreamTermination(_) => true, - } - } - /// Returns true if this response always terminates the stream. pub fn close_after(&self) -> bool { !matches!(self, RPCCodedResponse::Success(_)) diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 09ef6185957..8ea7b84bc95 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -91,8 +91,8 @@ impl OutboundRequest { } /* These functions are used in the handler for stream management */ - /// Number of responses expected for this request. - pub fn expected_responses(&self) -> u64 { + /// Maximum number of responses expected for this request. + pub fn max_responses(&self) -> u64 { match self { OutboundRequest::Status(_) => 1, OutboundRequest::Goodbye(_) => 0, @@ -105,6 +105,19 @@ impl OutboundRequest { } } + pub fn expect_exactly_one_response(&self) -> bool { + match self { + OutboundRequest::Status(_) => true, + OutboundRequest::Goodbye(_) => false, + OutboundRequest::BlocksByRange(_) => false, + OutboundRequest::BlocksByRoot(_) => false, + OutboundRequest::BlobsByRange(_) => false, + OutboundRequest::BlobsByRoot(_) => false, + OutboundRequest::Ping(_) => true, + OutboundRequest::MetaData(_) => true, + } + } + /// Gives the corresponding `SupportedProtocol` to this request. pub fn versioned_protocol(&self) -> SupportedProtocol { match self { diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 6ff72f658a8..f65586087c2 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -654,8 +654,8 @@ pub enum InboundRequest { impl InboundRequest { /* These functions are used in the handler for stream management */ - /// Number of responses expected for this request. - pub fn expected_responses(&self) -> u64 { + /// Maximum number of responses expected for this request. + pub fn max_responses(&self) -> u64 { match self { InboundRequest::Status(_) => 1, InboundRequest::Goodbye(_) => 0, diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 801a4af54b3..b304eb546da 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -228,7 +228,7 @@ impl RPCRateLimiterBuilder { pub trait RateLimiterItem { fn protocol(&self) -> Protocol; - fn expected_responses(&self) -> u64; + fn max_responses(&self) -> u64; } impl RateLimiterItem for super::InboundRequest { @@ -236,8 +236,8 @@ impl RateLimiterItem for super::InboundRequest { self.versioned_protocol().protocol() } - fn expected_responses(&self) -> u64 { - self.expected_responses() + fn max_responses(&self) -> u64 { + self.max_responses() } } @@ -246,8 +246,8 @@ impl RateLimiterItem for super::OutboundRequest { self.versioned_protocol().protocol() } - fn expected_responses(&self) -> u64 { - self.expected_responses() + fn max_responses(&self) -> u64 { + self.max_responses() } } impl RPCRateLimiter { @@ -299,7 +299,7 @@ impl RPCRateLimiter { request: &Item, ) -> Result<(), RateLimitedErr> { let time_since_start = self.init_time.elapsed(); - let tokens = request.expected_responses().max(1); + let tokens = request.max_responses().max(1); let check = |limiter: &mut Limiter| limiter.allows(time_since_start, peer_id, tokens); diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 6872a712c9d..f10646c7414 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -102,14 +102,14 @@ impl NetworkBeaconProcessor { self.try_send(BeaconWorkEvent { drop_during_sync: true, work: Work::GossipAttestation { - attestation: GossipAttestationPackage { + attestation: Box::new(GossipAttestationPackage { message_id, peer_id, attestation: Box::new(attestation), subnet_id, should_import, seen_timestamp, - }, + }), process_individual: Box::new(process_individual), process_batch: Box::new(process_batch), }, @@ -148,13 +148,13 @@ impl NetworkBeaconProcessor { self.try_send(BeaconWorkEvent { drop_during_sync: true, work: Work::GossipAggregate { - aggregate: GossipAggregatePackage { + aggregate: Box::new(GossipAggregatePackage { message_id, peer_id, aggregate: Box::new(aggregate), beacon_block_root, seen_timestamp, - }, + }), process_individual: Box::new(process_individual), process_batch: Box::new(process_batch), }, @@ -508,20 +508,15 @@ impl NetworkBeaconProcessor { request: BlocksByRangeRequest, ) -> Result<(), Error> { let processor = self.clone(); - let process_fn = move |send_idle_on_drop| { - let executor = processor.executor.clone(); - processor.handle_blocks_by_range_request( - executor, - send_idle_on_drop, - peer_id, - request_id, - request, - ) + let process_fn = async move { + processor + .handle_blocks_by_range_request(peer_id, request_id, request) + .await; }; self.try_send(BeaconWorkEvent { drop_during_sync: false, - work: Work::BlocksByRangeRequest(Box::new(process_fn)), + work: Work::BlocksByRangeRequest(Box::pin(process_fn)), }) } @@ -533,20 +528,15 @@ impl NetworkBeaconProcessor { request: BlocksByRootRequest, ) -> Result<(), Error> { let processor = self.clone(); - let process_fn = move |send_idle_on_drop| { - let executor = processor.executor.clone(); - processor.handle_blocks_by_root_request( - executor, - send_idle_on_drop, - peer_id, - request_id, - request, - ) + let process_fn = async move { + processor + .handle_blocks_by_root_request(peer_id, request_id, request) + .await; }; self.try_send(BeaconWorkEvent { drop_during_sync: false, - work: Work::BlocksByRootsRequest(Box::new(process_fn)), + work: Work::BlocksByRootsRequest(Box::pin(process_fn)), }) } diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 1b4b93adb0c..1e72dc42578 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -3,7 +3,6 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped}; -use beacon_processor::SendOnDrop; use itertools::process_results; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use lighthouse_network::rpc::*; @@ -12,7 +11,6 @@ use slog::{debug, error, warn}; use slot_clock::SlotClock; use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; -use task_executor::TaskExecutor; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; use types::{Epoch, EthSpec, ForkName, Hash256, Slot}; @@ -128,90 +126,102 @@ impl NetworkBeaconProcessor { } /// Handle a `BlocksByRoot` request from the peer. - pub fn handle_blocks_by_root_request( + pub async fn handle_blocks_by_root_request( self: Arc, - executor: TaskExecutor, - send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.clone() + .handle_blocks_by_root_request_inner(peer_id, request_id, request) + .await, + Response::BlocksByRoot, + ); + } + + /// Handle a `BlocksByRoot` request from the peer. + pub async fn handle_blocks_by_root_request_inner( + self: Arc, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRootRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { + let log_results = |peer_id, requested_blocks, send_block_count| { + debug!( + self.log, + "BlocksByRoot outgoing response processed"; + "peer" => %peer_id, + "requested" => requested_blocks, + "returned" => %send_block_count + ); + }; + let requested_blocks = request.block_roots().len(); let mut block_stream = match self .chain - .get_blocks_checking_caches(request.block_roots().to_vec(), &executor) + .get_blocks_checking_caches(request.block_roots().to_vec()) { Ok(block_stream) => block_stream, - Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + Err(e) => { + error!(self.log, "Error getting block stream"; "error" => ?e); + return Err(( + RPCResponseErrorCode::ServerError, + "Error getting block stream", + )); + } }; // Fetching blocks is async because it may have to hit the execution layer for payloads. - executor.spawn( - async move { - let mut send_block_count = 0; - let mut send_response = true; - while let Some((root, result)) = block_stream.next().await { - match result.as_ref() { - Ok(Some(block)) => { - self.send_response( - peer_id, - Response::BlocksByRoot(Some(block.clone())), - request_id, - ); - send_block_count += 1; - } - Ok(None) => { - debug!( - self.log, - "Peer requested unknown block"; - "peer" => %peer_id, - "request_root" => ?root - ); - } - Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { - debug!( - self.log, - "Failed to fetch execution payload for blocks by root request"; - "block_root" => ?root, - "reason" => "execution layer not synced", - ); - // send the stream terminator - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Execution layer not synced".into(), - request_id, - ); - send_response = false; - break; - } - Err(e) => { - debug!( - self.log, - "Error fetching block for peer"; - "peer" => %peer_id, - "request_root" => ?root, - "error" => ?e, - ); - } - } + let mut send_block_count = 0; + while let Some((root, result)) = block_stream.next().await { + match result.as_ref() { + Ok(Some(block)) => { + self.send_response( + peer_id, + Response::BlocksByRoot(Some(block.clone())), + request_id, + ); + send_block_count += 1; } - debug!( - self.log, - "Received BlocksByRoot Request"; - "peer" => %peer_id, - "requested" => requested_blocks, - "returned" => %send_block_count - ); - - // send stream termination - if send_response { - self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + Ok(None) => { + debug!( + self.log, + "Peer requested unknown block"; + "peer" => %peer_id, + "request_root" => ?root + ); } - drop(send_on_drop); - }, - "load_blocks_by_root_blocks", - ) + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + self.log, + "Failed to fetch execution payload for blocks by root request"; + "block_root" => ?root, + "reason" => "execution layer not synced", + ); + log_results(peer_id, requested_blocks, send_block_count); + return Err(( + RPCResponseErrorCode::ResourceUnavailable, + "Execution layer not synced", + )); + } + Err(e) => { + debug!( + self.log, + "Error fetching block for peer"; + "peer" => %peer_id, + "request_root" => ?root, + "error" => ?e, + ); + } + } + } + log_results(peer_id, requested_blocks, send_block_count); + + Ok(()) } + /// Handle a `BlobsByRoot` request from the peer. pub fn handle_blobs_by_root_request( self: Arc, @@ -219,10 +229,25 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, request: BlobsByRootRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.handle_blobs_by_root_request_inner(peer_id, request_id, request), + Response::BlobsByRoot, + ); + } + + /// Handle a `BlobsByRoot` request from the peer. + pub fn handle_blobs_by_root_request_inner( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRootRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root) else { // No blob ids requested. - return; + return Ok(()); }; let requested_indices = request .blob_ids @@ -231,7 +256,6 @@ impl NetworkBeaconProcessor { .map(|id| id.index) .collect::>(); let mut send_blob_count = 0; - let send_response = true; let mut blob_list_results = HashMap::new(); for id in request.blob_ids.as_slice() { @@ -280,17 +304,14 @@ impl NetworkBeaconProcessor { } debug!( self.log, - "Received BlobsByRoot Request"; + "BlobsByRoot outgoing response processed"; "peer" => %peer_id, "request_root" => %requested_root, "request_indices" => ?requested_indices, "returned" => send_blob_count ); - // send stream termination - if send_response { - self.send_response(peer_id, Response::BlobsByRoot(None), request_id); - } + Ok(()) } /// Handle a `LightClientBootstrap` request from the peer. @@ -300,33 +321,29 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, request: LightClientBootstrapRequest, ) { - let block_root = request.root; - match self.chain.get_light_client_bootstrap(&block_root) { - Ok(Some((bootstrap, _))) => self.send_response( - peer_id, - Response::LightClientBootstrap(Arc::new(bootstrap)), - request_id, - ), - Ok(None) => self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not available".into(), - request_id, - ), - Err(e) => { - self.send_error_response( - peer_id, + self.terminate_response_single_item( + peer_id, + request_id, + match self.chain.get_light_client_bootstrap(&request.root) { + Ok(Some((bootstrap, _))) => Ok(Arc::new(bootstrap)), + Ok(None) => Err(( RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not available".into(), - request_id, - ); - error!(self.log, "Error getting LightClientBootstrap instance"; - "block_root" => ?block_root, - "peer" => %peer_id, - "error" => ?e - ) - } - }; + "Bootstrap not available", + )), + Err(e) => { + error!(self.log, "Error getting LightClientBootstrap instance"; + "block_root" => ?request.root, + "peer" => %peer_id, + "error" => ?e + ); + Err(( + RPCResponseErrorCode::ResourceUnavailable, + "Bootstrap not available", + )) + } + }, + Response::LightClientBootstrap, + ); } /// Handle a `LightClientOptimisticUpdate` request from the peer. @@ -335,25 +352,22 @@ impl NetworkBeaconProcessor { peer_id: PeerId, request_id: PeerRequestId, ) { - let Some(light_client_optimistic_update) = self - .chain - .light_client_server_cache - .get_latest_optimistic_update() - else { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Latest optimistic update not available".into(), - request_id, - ); - return; - }; - - self.send_response( + self.terminate_response_single_item( peer_id, - Response::LightClientOptimisticUpdate(Arc::new(light_client_optimistic_update)), request_id, - ) + match self + .chain + .light_client_server_cache + .get_latest_optimistic_update() + { + Some(update) => Ok(Arc::new(update)), + None => Err(( + RPCResponseErrorCode::ResourceUnavailable, + "Latest optimistic update not available", + )), + }, + Response::LightClientOptimisticUpdate, + ); } /// Handle a `LightClientFinalityUpdate` request from the peer. @@ -362,36 +376,48 @@ impl NetworkBeaconProcessor { peer_id: PeerId, request_id: PeerRequestId, ) { - let Some(light_client_finality_update) = self - .chain - .light_client_server_cache - .get_latest_finality_update() - else { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Latest finality update not available".into(), - request_id, - ); - return; - }; - - self.send_response( + self.terminate_response_single_item( peer_id, - Response::LightClientFinalityUpdate(Arc::new(light_client_finality_update)), request_id, - ) + match self + .chain + .light_client_server_cache + .get_latest_finality_update() + { + Some(update) => Ok(Arc::new(update)), + None => Err(( + RPCResponseErrorCode::ResourceUnavailable, + "Latest finality update not available", + )), + }, + Response::LightClientFinalityUpdate, + ); } /// Handle a `BlocksByRange` request from the peer. - pub fn handle_blocks_by_range_request( + pub async fn handle_blocks_by_range_request( self: Arc, - executor: TaskExecutor, - send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, req: BlocksByRangeRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.clone() + .handle_blocks_by_range_request_inner(peer_id, request_id, req) + .await, + Response::BlocksByRange, + ); + } + + /// Handle a `BlocksByRange` request from the peer. + pub async fn handle_blocks_by_range_request_inner( + self: Arc, + peer_id: PeerId, + request_id: PeerRequestId, + req: BlocksByRangeRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, "count" => req.count(), @@ -413,12 +439,10 @@ impl NetworkBeaconProcessor { } }); if *req.count() > max_request_size { - return self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::InvalidRequest, - format!("Request exceeded max size {max_request_size}"), - request_id, - ); + "Request exceeded max size", + )); } let forwards_block_root_iter = match self @@ -436,25 +460,15 @@ impl NetworkBeaconProcessor { "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling")); } Err(e) => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Database error".into(), - request_id, - ); - return error!(self.log, "Unable to obtain root iter"; + error!(self.log, "Unable to obtain root iter"; "request" => ?req, "peer" => %peer_id, "error" => ?e ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); } }; @@ -480,154 +494,129 @@ impl NetworkBeaconProcessor { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, Err(e) => { - return error!(self.log, "Error during iteration over blocks"; + error!(self.log, "Error during iteration over blocks"; "request" => ?req, "peer" => %peer_id, "error" => ?e - ) + ); + return Err((RPCResponseErrorCode::ServerError, "Iteration error")); } }; // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); - let mut block_stream = match self.chain.get_blocks(block_roots, &executor) { + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { + if blocks_sent < (*req.count() as usize) { + debug!( + self.log, + "BlocksByRange outgoing response processed"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot(), + "current_slot" => current_slot, + "requested" => req.count(), + "returned" => blocks_sent + ); + } else { + debug!( + self.log, + "BlocksByRange outgoing response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot(), + "current_slot" => current_slot, + "requested" => req.count(), + "returned" => blocks_sent + ); + } + }; + + let mut block_stream = match self.chain.get_blocks(block_roots) { Ok(block_stream) => block_stream, - Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + Err(e) => { + error!(self.log, "Error getting block stream"; "error" => ?e); + return Err((RPCResponseErrorCode::ServerError, "Iterator error")); + } }; // Fetching blocks is async because it may have to hit the execution layer for payloads. - executor.spawn( - async move { - let mut blocks_sent = 0; - let mut send_response = true; - - while let Some((root, result)) = block_stream.next().await { - match result.as_ref() { - Ok(Some(block)) => { - // Due to skip slots, blocks could be out of the range, we ensure they - // are in the range before sending - if block.slot() >= *req.start_slot() - && block.slot() < req.start_slot() + req.count() - { - blocks_sent += 1; - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlocksByRange(Some(block.clone())), - id: request_id, - }); - } - } - Ok(None) => { - error!( - self.log, - "Block in the chain is not in the store"; - "request" => ?req, - "peer" => %peer_id, - "request_root" => ?root - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Database inconsistency".into(), - request_id, - ); - send_response = false; - break; - } - Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { - debug!( - self.log, - "Failed to fetch execution payload for blocks by range request"; - "block_root" => ?root, - "reason" => "execution layer not synced", - ); - // send the stream terminator - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Execution layer not synced".into(), - request_id, - ); - send_response = false; - break; - } - Err(e) => { - if matches!( - e, - BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, ref boxed_error) - if matches!(**boxed_error, execution_layer::Error::EngineError(_)) - ) { - warn!( - self.log, - "Error rebuilding payload for peer"; - "info" => "this may occur occasionally when the EE is busy", - "block_root" => ?root, - "error" => ?e, - ); - } else { - error!( - self.log, - "Error fetching block for peer"; - "block_root" => ?root, - "error" => ?e - ); - } - - // send the stream terminator - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Failed fetching blocks".into(), - request_id, - ); - send_response = false; - break; - } + let mut blocks_sent = 0; + while let Some((root, result)) = block_stream.next().await { + match result.as_ref() { + Ok(Some(block)) => { + // Due to skip slots, blocks could be out of the range, we ensure they + // are in the range before sending + if block.slot() >= *req.start_slot() + && block.slot() < req.start_slot() + req.count() + { + blocks_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlocksByRange(Some(block.clone())), + id: request_id, + }); } } - - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - - if blocks_sent < (*req.count() as usize) { - debug!( + Ok(None) => { + error!( self.log, - "BlocksByRange outgoing response processed"; + "Block in the chain is not in the store"; + "request" => ?req, "peer" => %peer_id, - "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot(), - "current_slot" => current_slot, - "requested" => req.count(), - "returned" => blocks_sent + "request_root" => ?root ); - } else { + log_results(req, peer_id, blocks_sent); + return Err((RPCResponseErrorCode::ServerError, "Database inconsistency")); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { debug!( self.log, - "BlocksByRange outgoing response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot(), - "current_slot" => current_slot, - "requested" => req.count(), - "returned" => blocks_sent + "Failed to fetch execution payload for blocks by range request"; + "block_root" => ?root, + "reason" => "execution layer not synced", ); + log_results(req, peer_id, blocks_sent); + // send the stream terminator + return Err(( + RPCResponseErrorCode::ResourceUnavailable, + "Execution layer not synced", + )); } - - if send_response { + Err(e) => { + if matches!( + e, + BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, ref boxed_error) + if matches!(**boxed_error, execution_layer::Error::EngineError(_)) + ) { + warn!( + self.log, + "Error rebuilding payload for peer"; + "info" => "this may occur occasionally when the EE is busy", + "block_root" => ?root, + "error" => ?e, + ); + } else { + error!( + self.log, + "Error fetching block for peer"; + "block_root" => ?root, + "error" => ?e + ); + } + log_results(req, peer_id, blocks_sent); // send the stream terminator - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlocksByRange(None), - id: request_id, - }); + return Err((RPCResponseErrorCode::ServerError, "Failed fetching blocks")); } + } + } - drop(send_on_drop); - }, - "load_blocks_by_range_blocks", - ); + log_results(req, peer_id, blocks_sent); + Ok(()) } /// Handle a `BlobsByRange` request from the peer. @@ -637,6 +626,21 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, req: BlobsByRangeRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.handle_blobs_by_range_request_inner(peer_id, request_id, req), + Response::BlobsByRange, + ); + } + + /// Handle a `BlobsByRange` request from the peer. + fn handle_blobs_by_range_request_inner( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + req: BlobsByRangeRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { debug!(self.log, "Received BlobsByRange Request"; "peer_id" => %peer_id, "count" => req.count, @@ -645,12 +649,10 @@ impl NetworkBeaconProcessor { // Should not send more than max request blocks if req.max_blobs_requested::() > self.chain.spec.max_request_blob_sidecars { - return self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::InvalidRequest, - "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(), - request_id, - ); + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", + )); } let request_start_slot = Slot::from(req.start_slot); @@ -659,13 +661,10 @@ impl NetworkBeaconProcessor { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), None => { debug!(self.log, "Deneb fork is disabled"); - self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::InvalidRequest, - "Deneb fork is disabled".into(), - request_id, - ); - return; + "Deneb fork is disabled", + )); } }; @@ -685,19 +684,15 @@ impl NetworkBeaconProcessor { ); return if data_availability_boundary_slot < oldest_blob_slot { - self.send_error_response( - peer_id, + Err(( RPCResponseErrorCode::ResourceUnavailable, - "blobs pruned within boundary".into(), - request_id, - ) + "blobs pruned within boundary", + )) } else { - self.send_error_response( - peer_id, + Err(( RPCResponseErrorCode::InvalidRequest, - "Req outside availability period".into(), - request_id, - ) + "Req outside availability period", + )) }; } @@ -714,25 +709,15 @@ impl NetworkBeaconProcessor { "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling")); } Err(e) => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Database error".into(), - request_id, - ); - return error!(self.log, "Unable to obtain root iter"; + error!(self.log, "Unable to obtain root iter"; "request" => ?req, "peer" => %peer_id, "error" => ?e ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); } }; @@ -764,19 +749,35 @@ impl NetworkBeaconProcessor { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, Err(e) => { - return error!(self.log, "Error during iteration over blocks"; + error!(self.log, "Error during iteration over blocks"; "request" => ?req, "peer" => %peer_id, "error" => ?e - ) + ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); } }; + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |peer_id, req: BlobsByRangeRequest, blobs_sent| { + debug!( + self.log, + "BlobsByRange outgoing response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent + ); + }; + // remove all skip slots let block_roots = block_roots.into_iter().flatten(); - let mut blobs_sent = 0; - let mut send_response = true; for root in block_roots { match self.chain.get_blobs(&root) { @@ -799,40 +800,64 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "error" => ?e ); - self.send_error_response( - peer_id, + log_results(peer_id, req, blobs_sent); + + return Err(( RPCResponseErrorCode::ServerError, - "No blobs and failed fetching corresponding block".into(), - request_id, - ); - send_response = false; - break; + "No blobs and failed fetching corresponding block", + )); } } } + log_results(peer_id, req, blobs_sent); - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + Ok(()) + } - debug!( - self.log, - "BlobsByRange Response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent - ); + /// Helper function to ensure single item protocol always end with either a single chunk or an + /// error + fn terminate_response_single_item Response>( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + result: Result, + into_response: F, + ) { + match result { + Ok(resp) => { + // Not necessary to explicitly send a termination message if this InboundRequest + // returns <= 1 for InboundRequest::expected_responses + // https://github.com/sigp/lighthouse/blob/3058b96f2560f1da04ada4f9d8ba8e5651794ff6/beacon_node/lighthouse_network/src/rpc/handler.rs#L555-L558 + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: into_response(resp), + id: request_id, + }); + } + Err((error_code, reason)) => { + self.send_error_response(peer_id, error_code, reason.into(), request_id); + } + } + } - if send_response { - // send the stream terminator - self.send_network_message(NetworkMessage::SendResponse { + /// Helper function to ensure streamed protocols with multiple responses always end with either + /// a stream termination or an error + fn terminate_response_stream) -> Response>( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + result: Result<(), (RPCResponseErrorCode, &'static str)>, + into_response: F, + ) { + match result { + Ok(_) => self.send_network_message(NetworkMessage::SendResponse { peer_id, - response: Response::BlobsByRange(None), + response: into_response(None), id: request_id, - }); + }), + Err((error_code, reason)) => { + self.send_error_response(peer_id, error_code, reason.into(), request_id); + } } } } diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 2f7a92e91ed..1937fc11cf9 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -497,10 +497,7 @@ impl Router { crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id); return; } - id @ (SyncId::BackFillBlocks { .. } - | SyncId::RangeBlocks { .. } - | SyncId::BackFillBlockAndBlobs { .. } - | SyncId::RangeBlockAndBlobs { .. }) => id, + id @ SyncId::RangeBlockAndBlobs { .. } => id, }, RequestId::Router => { crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id); @@ -559,10 +556,7 @@ impl Router { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { id @ SyncId::SingleBlock { .. } => id, - SyncId::BackFillBlocks { .. } - | SyncId::RangeBlocks { .. } - | SyncId::RangeBlockAndBlobs { .. } - | SyncId::BackFillBlockAndBlobs { .. } => { + SyncId::RangeBlockAndBlobs { .. } => { crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id); return; } @@ -604,10 +598,7 @@ impl Router { crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id); return; } - SyncId::BackFillBlocks { .. } - | SyncId::RangeBlocks { .. } - | SyncId::RangeBlockAndBlobs { .. } - | SyncId::BackFillBlockAndBlobs { .. } => { + SyncId::RangeBlockAndBlobs { .. } => { crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id); return; } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 915aeb82ec4..67fe871accf 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -10,6 +10,7 @@ use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::manager::{BatchProcessResult, Id}; +use crate::sync::network_context::RangeRequestId; use crate::sync::network_context::SyncNetworkContext; use crate::sync::range_sync::{ BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, @@ -961,7 +962,12 @@ impl BackFillSync { ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, is_blob_batch) = batch.to_blocks_by_range_request(); - match network.backfill_blocks_by_range_request(peer, is_blob_batch, request, batch_id) { + match network.blocks_and_blobs_by_range_request( + peer, + is_blob_batch, + request, + RangeRequestId::BackfillSync { batch_id }, + ) { Ok(request_id) => { // inform the batch about the new request if let Err(e) = batch.start_downloading_from_peer(peer, request_id) { diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index a46e7d09376..8f7881eea8a 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -12,7 +12,6 @@ use beacon_chain::data_availability_checker::ChildComponents; use beacon_chain::{get_block_root, BeaconChainTypes}; use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::rpc::BlocksByRootRequest; -use rand::prelude::IteratorRandom; use std::ops::IndexMut; use std::sync::Arc; use std::time::Duration; @@ -104,7 +103,7 @@ pub trait RequestState { cx: &SyncNetworkContext, ) -> Result<(), LookupRequestError> { // Check if request is necessary. - if !matches!(self.get_state().state, State::AwaitingDownload) { + if !self.get_state().is_awaiting_download() { return Ok(()); } @@ -112,13 +111,12 @@ pub trait RequestState { let (peer_id, request) = self.build_request(&cx.chain.spec)?; // Update request state. - self.get_state_mut().state = State::Downloading { peer_id }; - self.get_state_mut().req_counter += 1; + let req_counter = self.get_state_mut().on_download_start(peer_id); // Make request let id = SingleLookupReqId { id, - req_counter: self.get_state().req_counter, + req_counter, lookup_type: L::lookup_type(), }; Self::make_request(id, peer_id, request, cx) @@ -130,8 +128,7 @@ pub trait RequestState { let request_state = self.get_state(); if request_state.failed_attempts() >= max_attempts { - let cannot_process = - request_state.failed_processing >= request_state.failed_downloading; + let cannot_process = request_state.more_failed_processing_attempts(); Err(LookupRequestError::TooManyAttempts { cannot_process }) } else { Ok(()) @@ -141,15 +138,9 @@ pub trait RequestState { /// Get the next peer to request. Draws from the set of peers we think should have both the /// block and blob first. If that fails, we draw from the set of peers that may have either. fn get_peer(&mut self) -> Result { - let request_state = self.get_state_mut(); - let peer_id = request_state - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .ok_or(LookupRequestError::NoPeers)?; - request_state.used_peers.insert(peer_id); - Ok(peer_id) + self.get_state_mut() + .use_rand_available_peer() + .ok_or(LookupRequestError::NoPeers) } /// Initialize `Self::RequestType`. @@ -169,29 +160,35 @@ pub trait RequestState { fn verify_response( &mut self, expected_block_root: Hash256, + peer_id: PeerId, response: Option, ) -> Result, LookupVerifyError> { - let request_state = self.get_state_mut(); - match request_state.state { - State::AwaitingDownload => { - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - State::Downloading { peer_id } => { - self.verify_response_inner(expected_block_root, response, peer_id) + let result = match *self.get_state().get_state() { + State::AwaitingDownload => Err(LookupVerifyError::ExtraBlocksReturned), + State::Downloading { peer_id: _ } => { + // TODO: We requested a download from Downloading { peer_id }, but the network + // injects a response from a different peer_id. What should we do? The peer_id to + // track for scoring is the one that actually sent the response, not the state's + self.verify_response_inner(expected_block_root, response) } - State::Processing { peer_id: _ } => match response { - Some(_) => { - // We sent the block for processing and received an extra block. - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - None => { - // This is simply the stream termination and we are already processing the - // block - Ok(None) - } + State::Processing { .. } | State::Processed { .. } => match response { + // We sent the block for processing and received an extra block. + Some(_) => Err(LookupVerifyError::ExtraBlocksReturned), + // This is simply the stream termination and we are already processing the block + None => Ok(None), }, + }; + + match result { + Ok(Some(response)) => { + self.get_state_mut().on_download_success(peer_id); + Ok(Some(response)) + } + Ok(None) => Ok(None), + Err(e) => { + self.get_state_mut().on_download_failure(); + Err(e) + } } } @@ -200,7 +197,6 @@ pub trait RequestState { &mut self, expected_block_root: Hash256, response: Option, - peer_id: PeerId, ) -> Result, LookupVerifyError>; /// A getter for the parent root of the response. Returns an `Option` because we won't know @@ -232,7 +228,7 @@ pub trait RequestState { /// Register a failure to process the block or blob. fn register_failure_downloading(&mut self) { - self.get_state_mut().register_failure_downloading() + self.get_state_mut().on_download_failure() } /* Utility methods */ @@ -274,7 +270,6 @@ impl RequestState for BlockRequestState &mut self, expected_block_root: Hash256, response: Option, - peer_id: PeerId, ) -> Result>>, LookupVerifyError> { match response { Some(block) => { @@ -285,18 +280,13 @@ impl RequestState for BlockRequestState // return an error and drop the block // NOTE: we take this is as a download failure to prevent counting the // attempt as a chain failure, but simply a peer failure. - self.state.register_failure_downloading(); Err(LookupVerifyError::RootMismatch) } else { // Return the block for processing. - self.state.state = State::Processing { peer_id }; Ok(Some(block)) } } - None => { - self.state.register_failure_downloading(); - Err(LookupVerifyError::NoBlockReturned) - } + None => Err(LookupVerifyError::NoBlockReturned), } } @@ -374,25 +364,20 @@ impl RequestState for BlobRequestState, - peer_id: PeerId, ) -> Result>, LookupVerifyError> { match blob { Some(blob) => { let received_id = blob.id(); if !self.requested_ids.contains(&received_id) { - Err(LookupVerifyError::UnrequestedBlobId(received_id)) - } else if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - Err(LookupVerifyError::InvalidInclusionProof) - } else if blob.block_root() != expected_block_root { - Err(LookupVerifyError::UnrequestedHeader) - } else { - Ok(()) + return Err(LookupVerifyError::UnrequestedBlobId(received_id)); + } + if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if blob.block_root() != expected_block_root { + return Err(LookupVerifyError::UnrequestedHeader); } - .map_err(|e| { - self.state.register_failure_downloading(); - e - })?; // State should remain downloading until we receive the stream terminator. self.requested_ids.remove(&received_id); @@ -403,7 +388,6 @@ impl RequestState for BlobRequestState { - self.state.state = State::Processing { peer_id }; let blobs = std::mem::take(&mut self.blob_download_queue); Ok(Some(blobs)) } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 949e1762ac7..a5826bcb3d8 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -45,6 +45,13 @@ pub type DownloadedBlock = (Hash256, RpcBlock); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; +enum Action { + Retry, + ParentUnknown { parent_root: Hash256, slot: Slot }, + Drop, + Continue, +} + pub struct BlockLookups { /// Parent chain lookups being downloaded. parent_lookups: SmallVec<[ParentLookup; 3]>, @@ -285,12 +292,16 @@ impl BlockLookups { let mut lookup = self.single_block_lookups.remove(&id.id)?; let request_state = R::request_state_mut(&mut lookup); - if id.req_counter != request_state.get_state().req_counter { + if request_state + .get_state() + .is_current_req_counter(id.req_counter) + { + Some(lookup) + } else { // We don't want to drop the lookup, just ignore the old response. self.single_block_lookups.insert(id.id, lookup); - return None; + None } - Some(lookup) } /// Checks whether a single block lookup is waiting for a parent lookup to complete. This is @@ -374,7 +385,7 @@ impl BlockLookups { let expected_block_root = lookup.block_root(); let request_state = R::request_state_mut(&mut lookup); - match request_state.verify_response(expected_block_root, response) { + match request_state.verify_response(expected_block_root, peer_id, response) { Ok(Some(verified_response)) => { self.handle_verified_response::( seen_timestamp, @@ -415,10 +426,6 @@ impl BlockLookups { let id = lookup.id; let block_root = lookup.block_root(); - R::request_state_mut(lookup) - .get_state_mut() - .component_downloaded = true; - let cached_child = lookup.add_response::(verified_response.clone()); match cached_child { CachedChild::Ok(block) => { @@ -447,10 +454,7 @@ impl BlockLookups { // initial request. if lookup.both_components_downloaded() { lookup.penalize_blob_peer(cx); - lookup - .blob_request_state - .state - .register_failure_downloading(); + lookup.blob_request_state.state.on_download_failure(); } lookup.request_block_and_blobs(cx)?; } @@ -493,13 +497,13 @@ impl BlockLookups { if R::request_state_mut(&mut parent_lookup.current_parent_request) .get_state() - .req_counter - != id.req_counter + .is_current_req_counter(id.req_counter) { + Some(parent_lookup) + } else { self.parent_lookups.push(parent_lookup); - return None; + None } - Some(parent_lookup) } /// Process a response received from a parent lookup request. @@ -559,7 +563,7 @@ impl BlockLookups { cx: &SyncNetworkContext, parent_lookup: &mut ParentLookup, ) -> Result<(), RequestError> { - match parent_lookup.verify_response::(response, &mut self.failed_chains) { + match parent_lookup.verify_response::(peer_id, response, &mut self.failed_chains) { Ok(Some(verified_response)) => { self.handle_verified_response::( seen_timestamp, @@ -640,7 +644,7 @@ impl BlockLookups { RequestError::ChainTooLong => { self.failed_chains.insert(parent_lookup.chain_hash()); // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { + for &peer_id in parent_lookup.all_used_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } @@ -653,7 +657,7 @@ impl BlockLookups { self.failed_chains.insert(parent_lookup.chain_hash()); } // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { + for &peer_id in parent_lookup.all_used_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } @@ -661,6 +665,9 @@ impl BlockLookups { // This happens if the peer disconnects while the block is being // processed. Drop the request without extra penalty } + RequestError::BadState(_) => { + // Should never happen + } } } @@ -668,9 +675,12 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|_, req| { + self.single_block_lookups.retain(|id, req| { let should_drop_lookup = req.should_drop_lookup_on_disconnected_peer(peer_id, cx, &self.log); + if should_drop_lookup { + debug!(self.log, "Dropping lookup after peer disconnected"; "id" => id, "block_root" => %req.block_root()); + } !should_drop_lookup }); @@ -770,7 +780,7 @@ impl BlockLookups { return; }; - let root = lookup.block_root(); + let block_root = lookup.block_root(); let request_state = R::request_state_mut(&mut lookup); let peer_id = match request_state.get_state().processing_peer() { @@ -784,28 +794,49 @@ impl BlockLookups { self.log, "Block component processed for lookup"; "response_type" => ?R::response_type(), - "block_root" => ?root, + "block_root" => ?block_root, "result" => ?result, "id" => target_id, ); - match result { - BlockProcessingResult::Ok(status) => match status { - AvailabilityProcessingStatus::Imported(root) => { - trace!(self.log, "Single block processing succeeded"; "block" => %root); - } - AvailabilityProcessingStatus::MissingComponents(_, _block_root) => { - match self.handle_missing_components::(cx, &mut lookup) { - Ok(()) => { - self.single_block_lookups.insert(target_id, lookup); - } - Err(e) => { - // Drop with an additional error. - warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e); - } - } + let action = match result { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) + | BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { + // Successfully imported + trace!(self.log, "Single block processing succeeded"; "block" => %block_root); + Action::Drop + } + + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + _, + _block_root, + )) => { + // `on_processing_success` is called here to ensure the request state is updated prior to checking + // if both components have been processed. + if R::request_state_mut(&mut lookup) + .get_state_mut() + .on_processing_success() + .is_err() + { + warn!( + self.log, + "Single block processing state incorrect"; + "action" => "dropping single block request" + ); + Action::Drop + // If this was the result of a block request, we can't determined if the block peer did anything + // wrong. If we already had both a block and blobs response processed, we should penalize the + // blobs peer because they did not provide all blobs on the initial request. + } else if lookup.both_components_processed() { + lookup.penalize_blob_peer(cx); + + // Try it again if possible. + lookup.blob_request_state.state.on_processing_failure(); + Action::Retry + } else { + Action::Continue } - }, + } BlockProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. @@ -814,127 +845,88 @@ impl BlockLookups { "Single block processing was ignored, cpu might be overloaded"; "action" => "dropping single block request" ); + Action::Drop } BlockProcessingResult::Err(e) => { - match self.handle_single_lookup_block_error(cx, lookup, peer_id, e) { - Ok(Some(lookup)) => { - self.single_block_lookups.insert(target_id, lookup); + let root = lookup.block_root(); + trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); + match e { + BlockError::BeaconChainError(e) => { + // Internal error + error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); + Action::Drop } - Ok(None) => { - // Drop without an additional error. + BlockError::ParentUnknown(block) => { + let slot = block.slot(); + let parent_root = block.parent_root(); + lookup.add_child_components(block.into()); + Action::ParentUnknown { parent_root, slot } } - Err(e) => { - // Drop with an additional error. - warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e); + ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { + // These errors indicate that the execution layer is offline + // and failed to validate the execution payload. Do not downscore peer. + debug!( + self.log, + "Single block lookup failed. Execution layer is offline / unsynced / misconfigured"; + "root" => %root, + "error" => ?e + ); + Action::Drop + } + BlockError::AvailabilityCheck(e) => match e.category() { + AvailabilityCheckErrorCategory::Internal => { + warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); + lookup.block_request_state.state.on_download_failure(); + lookup.blob_request_state.state.on_download_failure(); + Action::Retry + } + AvailabilityCheckErrorCategory::Malicious => { + warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); + lookup.handle_availability_check_failure(cx); + Action::Retry + } + }, + other => { + warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); + if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() { + cx.report_peer( + block_peer, + PeerAction::MidToleranceError, + "single_block_failure", + ); + + lookup.block_request_state.state.on_processing_failure(); + } + Action::Retry } } } }; - } - - /// Handles a `MissingComponents` block processing error. Handles peer scoring and retries. - /// - /// If this was the result of a block request, we can't determined if the block peer did anything - /// wrong. If we already had both a block and blobs response processed, we should penalize the - /// blobs peer because they did not provide all blobs on the initial request. - fn handle_missing_components>( - &self, - cx: &SyncNetworkContext, - lookup: &mut SingleBlockLookup, - ) -> Result<(), LookupRequestError> { - let request_state = R::request_state_mut(lookup); - - request_state.get_state_mut().component_processed = true; - if lookup.both_components_processed() { - lookup.penalize_blob_peer(cx); - - // Try it again if possible. - lookup - .blob_request_state - .state - .register_failure_processing(); - lookup.request_block_and_blobs(cx)?; - } - Ok(()) - } - /// Handles peer scoring and retries related to a `BlockError` in response to a single block - /// or blob lookup processing result. - fn handle_single_lookup_block_error( - &mut self, - cx: &mut SyncNetworkContext, - mut lookup: SingleBlockLookup, - peer_id: PeerId, - e: BlockError, - ) -> Result>, LookupRequestError> { - let root = lookup.block_root(); - trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); - match e { - BlockError::BlockIsAlreadyKnown(_) => { - // No error here - return Ok(None); - } - BlockError::BeaconChainError(e) => { - // Internal error - error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); - return Ok(None); + match action { + Action::Retry => { + if let Err(e) = lookup.request_block_and_blobs(cx) { + warn!(self.log, "Single block lookup failed"; "block_root" => %block_root, "error" => ?e); + // Failed with too many retries, drop with noop + self.update_metrics(); + } else { + self.single_block_lookups.insert(target_id, lookup); + } } - BlockError::ParentUnknown(block) => { - let slot = block.slot(); - let parent_root = block.parent_root(); - lookup.add_child_components(block.into()); - lookup.request_block_and_blobs(cx)?; - self.search_parent(slot, root, parent_root, peer_id, cx); + Action::ParentUnknown { parent_root, slot } => { + // TODO: Consider including all peers from the lookup, claiming to know this block, not + // just the one that sent this specific block + self.search_parent(slot, block_root, parent_root, peer_id, cx); + self.single_block_lookups.insert(target_id, lookup); } - ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { - // These errors indicate that the execution layer is offline - // and failed to validate the execution payload. Do not downscore peer. - debug!( - self.log, - "Single block lookup failed. Execution layer is offline / unsynced / misconfigured"; - "root" => %root, - "error" => ?e - ); - return Ok(None); + Action::Drop => { + // drop with noop + self.update_metrics(); } - BlockError::AvailabilityCheck(e) => match e.category() { - AvailabilityCheckErrorCategory::Internal => { - warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); - lookup - .block_request_state - .state - .register_failure_downloading(); - lookup - .blob_request_state - .state - .register_failure_downloading(); - lookup.request_block_and_blobs(cx)? - } - AvailabilityCheckErrorCategory::Malicious => { - warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); - lookup.handle_availability_check_failure(cx); - lookup.request_block_and_blobs(cx)? - } - }, - other => { - warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); - if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() { - cx.report_peer( - block_peer, - PeerAction::MidToleranceError, - "single_block_failure", - ); - - // Try it again if possible. - lookup - .block_request_state - .state - .register_failure_processing(); - lookup.request_block_and_blobs(cx)? - } + Action::Continue => { + self.single_block_lookups.insert(target_id, lookup); } } - Ok(Some(lookup)) } pub fn parent_block_processed( @@ -999,7 +991,7 @@ impl BlockLookups { .current_parent_request .blob_request_state .state - .register_failure_processing(); + .on_processing_failure(); match parent_lookup .current_parent_request .request_block_and_blobs(cx) @@ -1255,8 +1247,8 @@ impl BlockLookups { penalty, } => { self.failed_chains.insert(chain_hash); - for peer_source in request.all_peers() { - cx.report_peer(peer_source, penalty, "parent_chain_failure") + for peer_source in request.all_used_peers() { + cx.report_peer(*peer_source, penalty, "parent_chain_failure") } } BatchProcessResult::NonFaultyFailure => { @@ -1375,4 +1367,16 @@ impl BlockLookups { pub fn drop_parent_chain_requests(&mut self) -> usize { self.parent_lookups.drain(..).len() } + + pub fn update_metrics(&self) { + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, + self.single_block_lookups.len() as i64, + ); + + metrics::set_gauge( + &metrics::SYNC_PARENT_BLOCK_LOOKUPS, + self.parent_lookups.len() as i64, + ); + } } diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index a3cdfd7b00b..55dd26b661e 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -7,7 +7,6 @@ use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::{ChildComponents, DataAvailabilityChecker}; use beacon_chain::BeaconChainTypes; -use itertools::Itertools; use std::collections::VecDeque; use std::sync::Arc; use store::Hash256; @@ -56,6 +55,7 @@ pub enum RequestError { cannot_process: bool, }, NoPeers, + BadState(String), } impl ParentLookup { @@ -175,11 +175,11 @@ impl ParentLookup { self.current_parent_request .block_request_state .state - .register_failure_processing(); + .on_processing_failure(); self.current_parent_request .blob_request_state .state - .register_failure_processing(); + .on_processing_failure(); if let Some(components) = self.current_parent_request.child_components.as_mut() { components.downloaded_block = None; components.downloaded_blobs = <_>::default(); @@ -190,12 +190,14 @@ impl ParentLookup { /// the processing result of the block. pub fn verify_response>( &mut self, + peer_id: PeerId, block: Option, failed_chains: &mut lru_cache::LRUTimeCache, ) -> Result, ParentVerifyError> { let expected_block_root = self.current_parent_request.block_root(); let request_state = R::request_state_mut(&mut self.current_parent_request); - let root_and_verified = request_state.verify_response(expected_block_root, block)?; + let root_and_verified = + request_state.verify_response(expected_block_root, peer_id, block)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should // be dropped and the peer downscored. @@ -221,20 +223,8 @@ impl ParentLookup { self.current_parent_request.add_peers(peers) } - pub fn used_peers(&self) -> impl Iterator + '_ { - self.current_parent_request - .block_request_state - .state - .used_peers - .iter() - .chain( - self.current_parent_request - .blob_request_state - .state - .used_peers - .iter(), - ) - .unique() + pub fn all_used_peers(&self) -> impl Iterator + '_ { + self.current_parent_request.all_used_peers() } } @@ -264,6 +254,7 @@ impl From for RequestError { } E::NoPeers => RequestError::NoPeers, E::SendFailed(msg) => RequestError::SendFailed(msg), + E::BadState(msg) => RequestError::BadState(msg), } } } @@ -291,6 +282,7 @@ impl RequestError { } RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts", RequestError::NoPeers => "no_peers", + RequestError::BadState(_) => "bad_state", } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index a312f6e970a..15d10c77c24 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -8,7 +8,9 @@ use beacon_chain::data_availability_checker::{ AvailabilityCheckError, DataAvailabilityChecker, MissingBlobs, }; use beacon_chain::BeaconChainTypes; +use itertools::Itertools; use lighthouse_network::PeerAction; +use rand::seq::IteratorRandom; use slog::{trace, Logger}; use std::collections::HashSet; use std::fmt::Debug; @@ -19,13 +21,6 @@ use strum::IntoStaticStr; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::EthSpec; -#[derive(Debug, PartialEq, Eq)] -pub enum State { - AwaitingDownload, - Downloading { peer_id: PeerId }, - Processing { peer_id: PeerId }, -} - #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { RootMismatch, @@ -48,6 +43,7 @@ pub enum LookupRequestError { }, NoPeers, SendFailed(&'static str), + BadState(String), } pub struct SingleBlockLookup { @@ -94,18 +90,16 @@ impl SingleBlockLookup { self.block_request_state.requested_block_root = block_root; self.block_request_state.state.state = State::AwaitingDownload; self.blob_request_state.state.state = State::AwaitingDownload; - self.block_request_state.state.component_downloaded = false; - self.blob_request_state.state.component_downloaded = false; - self.block_request_state.state.component_processed = false; - self.blob_request_state.state.component_processed = false; self.child_components = Some(ChildComponents::empty(block_root)); } - /// Get all unique peers across block and blob requests. - pub fn all_peers(&self) -> HashSet { - let mut all_peers = self.block_request_state.state.used_peers.clone(); - all_peers.extend(self.blob_request_state.state.used_peers.clone()); - all_peers + /// Get all unique used peers across block and blob requests. + pub fn all_used_peers(&self) -> impl Iterator + '_ { + self.block_request_state + .state + .get_used_peers() + .chain(self.blob_request_state.state.get_used_peers()) + .unique() } /// Send the necessary requests for blocks and/or blobs. This will check whether we have @@ -206,14 +200,14 @@ impl SingleBlockLookup { /// Returns true if the block has already been downloaded. pub fn both_components_downloaded(&self) -> bool { - self.block_request_state.state.component_downloaded - && self.blob_request_state.state.component_downloaded + self.block_request_state.state.is_downloaded() + && self.blob_request_state.state.is_downloaded() } /// Returns true if the block has already been downloaded. pub fn both_components_processed(&self) -> bool { - self.block_request_state.state.component_processed - && self.blob_request_state.state.component_processed + self.block_request_state.state.is_processed() + && self.blob_request_state.state.is_processed() } /// Checks both the block and blob request states to see if the peer is disconnected. @@ -304,7 +298,7 @@ impl SingleBlockLookup { if let Some(cached_child) = self.child_components.as_mut() { cached_child.clear_blobs(); } - self.blob_request_state.state.register_failure_downloading() + self.blob_request_state.state.on_download_failure() } /// This failure occurs after processing, so register a failure processing, penalize the peer @@ -314,7 +308,7 @@ impl SingleBlockLookup { if let Some(cached_child) = self.child_components.as_mut() { cached_child.clear_blobs(); } - self.blob_request_state.state.register_failure_processing() + self.blob_request_state.state.on_processing_failure() } } @@ -375,29 +369,34 @@ pub enum CachedChild { /// There was an error during consistency checks between block and blobs. Err(AvailabilityCheckError), } + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + AwaitingDownload, + Downloading { peer_id: PeerId }, + Processing { peer_id: PeerId }, + Processed { peer_id: PeerId }, +} + /// Object representing the state of a single block or blob lookup request. #[derive(PartialEq, Eq, Debug)] pub struct SingleLookupRequestState { /// State of this request. - pub state: State, + state: State, /// Peers that should have this block or blob. - pub available_peers: HashSet, + available_peers: HashSet, /// Peers from which we have requested this block. - pub used_peers: HashSet, + used_peers: HashSet, /// How many times have we attempted to process this block or blob. - pub failed_processing: u8, + failed_processing: u8, /// How many times have we attempted to download this block or blob. - pub failed_downloading: u8, - /// Whether or not we have downloaded this block or blob. - pub component_downloaded: bool, - /// Whether or not we have processed this block or blob. - pub component_processed: bool, + failed_downloading: u8, /// Should be incremented everytime this request is retried. The purpose of this is to /// differentiate retries of the same block/blob request within a lookup. We currently penalize /// peers and retry requests prior to receiving the stream terminator. This means responses /// from a prior request may arrive after a new request has been sent, this counter allows /// us to differentiate these two responses. - pub req_counter: u32, + req_counter: u32, } impl SingleLookupRequestState { @@ -413,30 +412,83 @@ impl SingleLookupRequestState { used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, - component_downloaded: false, - component_processed: false, req_counter: 0, } } - /// Registers a failure in processing a block. - pub fn register_failure_processing(&mut self) { - self.failed_processing = self.failed_processing.saturating_add(1); - self.state = State::AwaitingDownload; + // TODO: Should not leak the enum state + pub fn get_state(&self) -> &State { + &self.state + } + + pub fn is_current_req_counter(&self, req_counter: u32) -> bool { + self.req_counter == req_counter + } + + pub fn is_awaiting_download(&self) -> bool { + matches!(self.state, State::AwaitingDownload) + } + + pub fn is_downloaded(&self) -> bool { + match self.state { + State::AwaitingDownload => false, + State::Downloading { .. } => false, + State::Processing { .. } => true, + State::Processed { .. } => true, + } + } + + pub fn is_processed(&self) -> bool { + match self.state { + State::AwaitingDownload => false, + State::Downloading { .. } => false, + State::Processing { .. } => false, + State::Processed { .. } => true, + } + } + + pub fn on_download_start(&mut self, peer_id: PeerId) -> u32 { + self.state = State::Downloading { peer_id }; + self.req_counter += 1; + self.req_counter } /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong /// block. - pub fn register_failure_downloading(&mut self) { + pub fn on_download_failure(&mut self) { self.failed_downloading = self.failed_downloading.saturating_add(1); self.state = State::AwaitingDownload; } + pub fn on_download_success(&mut self, peer_id: PeerId) { + self.state = State::Processing { peer_id }; + } + + /// Registers a failure in processing a block. + pub fn on_processing_failure(&mut self) { + self.failed_processing = self.failed_processing.saturating_add(1); + self.state = State::AwaitingDownload; + } + + pub fn on_processing_success(&mut self) -> Result<(), String> { + match &self.state { + State::Processing { peer_id } => { + self.state = State::Processed { peer_id: *peer_id }; + Ok(()) + } + other => Err(format!("not in processing state: {}", other).to_string()), + } + } + /// The total number of failures, whether it be processing or downloading. pub fn failed_attempts(&self) -> u8 { self.failed_processing + self.failed_downloading } + pub fn more_failed_processing_attempts(&self) -> bool { + self.failed_processing >= self.failed_downloading + } + /// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`. pub fn add_peer(&mut self, peer_id: &PeerId) { self.available_peers.insert(*peer_id); @@ -448,7 +500,7 @@ impl SingleLookupRequestState { if let State::Downloading { peer_id } = &self.state { if peer_id == dc_peer_id { // Peer disconnected before providing a block - self.register_failure_downloading(); + self.on_download_failure(); return Err(()); } } @@ -459,10 +511,25 @@ impl SingleLookupRequestState { /// returns an error. pub fn processing_peer(&self) -> Result { match &self.state { - State::Processing { peer_id } => Ok(*peer_id), + State::Processing { peer_id } | State::Processed { peer_id } => Ok(*peer_id), other => Err(format!("not in processing state: {}", other).to_string()), } } + + pub fn get_used_peers(&self) -> impl Iterator { + self.used_peers.iter() + } + + /// Selects a random peer from available peers if any, inserts it in used peers and returns it. + pub fn use_rand_available_peer(&mut self) -> Option { + let peer_id = self + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied()?; + self.used_peers.insert(peer_id); + Some(peer_id) + } } impl slog::Value for SingleBlockLookup { @@ -509,6 +576,7 @@ impl slog::Value for SingleLookupRequestState { State::Processing { peer_id } => { serializer.emit_arguments("processing_peer", &format_args!("{}", peer_id))? } + State::Processed { .. } => "processed".serialize(record, "state", serializer)?, } serializer.emit_u8("failed_downloads", self.failed_downloading)?; serializer.emit_u8("failed_processing", self.failed_processing)?; @@ -522,6 +590,7 @@ impl std::fmt::Display for State { State::AwaitingDownload => write!(f, "AwaitingDownload"), State::Downloading { .. } => write!(f, "Downloading"), State::Processing { .. } => write!(f, "Processing"), + State::Processed { .. } => write!(f, "Processed"), } } } @@ -608,6 +677,7 @@ mod tests { as RequestState>::verify_response( &mut sl.block_request_state, block.canonical_root(), + peer_id, Some(block.into()), ) .unwrap() @@ -647,7 +717,7 @@ mod tests { &spec, ) .unwrap(); - sl.block_request_state.state.register_failure_downloading(); + sl.block_request_state.state.on_download_failure(); } // Now we receive the block and send it for processing @@ -661,13 +731,14 @@ mod tests { as RequestState>::verify_response( &mut sl.block_request_state, block.canonical_root(), + peer_id, Some(block.into()), ) .unwrap() .unwrap(); // One processing failure maxes the available attempts - sl.block_request_state.state.register_failure_processing(); + sl.block_request_state.state.on_processing_failure(); assert_eq!( as RequestState>::build_request( &mut sl.block_request_state, diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 9c6e2fcf07c..6a3b568c1c4 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -3,7 +3,9 @@ use ssz_types::VariableList; use std::{collections::VecDeque, sync::Arc}; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; -#[derive(Debug, Default)] +use super::range_sync::ByRangeRequestType; + +#[derive(Debug)] pub struct BlocksAndBlobsRequestInfo { /// Blocks we have received awaiting for their corresponding sidecar. accumulated_blocks: VecDeque>>, @@ -13,9 +15,25 @@ pub struct BlocksAndBlobsRequestInfo { is_blocks_stream_terminated: bool, /// Whether the individual RPC request for sidecars is finished or not. is_sidecars_stream_terminated: bool, + /// Used to determine if this accumulator should wait for a sidecars stream termination + request_type: ByRangeRequestType, } impl BlocksAndBlobsRequestInfo { + pub fn new(request_type: ByRangeRequestType) -> Self { + Self { + accumulated_blocks: <_>::default(), + accumulated_sidecars: <_>::default(), + is_blocks_stream_terminated: <_>::default(), + is_sidecars_stream_terminated: <_>::default(), + request_type, + } + } + + pub fn get_request_type(&self) -> ByRangeRequestType { + self.request_type + } + pub fn add_block_response(&mut self, block_opt: Option>>) { match block_opt { Some(block) => self.accumulated_blocks.push_back(block), @@ -78,6 +96,38 @@ impl BlocksAndBlobsRequestInfo { } pub fn is_finished(&self) -> bool { - self.is_blocks_stream_terminated && self.is_sidecars_stream_terminated + let blobs_requested = match self.request_type { + ByRangeRequestType::Blocks => false, + ByRangeRequestType::BlocksAndBlobs => true, + }; + self.is_blocks_stream_terminated && (!blobs_requested || self.is_sidecars_stream_terminated) + } +} + +#[cfg(test)] +mod tests { + use super::BlocksAndBlobsRequestInfo; + use crate::sync::range_sync::ByRangeRequestType; + use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs}; + use rand::SeedableRng; + use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E}; + + #[test] + fn no_blobs_into_responses() { + let mut info = BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::Blocks); + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..4) + .map(|_| generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng).0) + .collect::>(); + + // Send blocks and complete terminate response + for block in blocks { + info.add_block_response(Some(block.into())); + } + info.add_block_response(None); + + // Assert response is finished and RpcBlocks can be constructed + assert!(info.is_finished()); + info.into_responses().unwrap(); } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ca4d0a2183d..a868a092d3d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,7 +36,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::common::LookupType; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlob, SyncNetworkContext}; +use super::network_context::{BlockOrBlob, RangeRequestId, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; @@ -44,8 +44,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::common::{Current, Parent}; use crate::sync::block_lookups::{BlobRequestState, BlockRequestState}; -use crate::sync::network_context::BlocksAndBlobsByRangeRequest; -use crate::sync::range_sync::ByRangeRequestType; +use crate::sync::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::ChildComponents; @@ -91,12 +90,6 @@ pub enum RequestId { SingleBlock { id: SingleLookupReqId }, /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, - /// Request was from the backfill sync algorithm. - BackFillBlocks { id: Id }, - /// Backfill request that is composed by both a block range request and a blob range request. - BackFillBlockAndBlobs { id: Id }, - /// The request was from a chain in the range sync algorithm. - RangeBlocks { id: Id }, /// Range request that is composed by both a block range request and a blob range request. RangeBlockAndBlobs { id: Id }, } @@ -363,63 +356,27 @@ impl SyncManager { error, ), }, - RequestId::BackFillBlocks { id } => { - if let Some(batch_id) = self - .network - .backfill_request_failed(id, ByRangeRequestType::Blocks) - { - match self - .backfill_sync - .inject_error(&mut self.network, batch_id, &peer_id, id) - { - Ok(_) => {} - Err(_) => self.update_sync_state(), - } - } - } - - RequestId::BackFillBlockAndBlobs { id } => { - if let Some(batch_id) = self - .network - .backfill_request_failed(id, ByRangeRequestType::BlocksAndBlobs) - { - match self - .backfill_sync - .inject_error(&mut self.network, batch_id, &peer_id, id) - { - Ok(_) => {} - Err(_) => self.update_sync_state(), - } - } - } - RequestId::RangeBlocks { id } => { - if let Some((chain_id, batch_id)) = self - .network - .range_sync_request_failed(id, ByRangeRequestType::Blocks) - { - self.range_sync.inject_error( - &mut self.network, - peer_id, - batch_id, - chain_id, - id, - ); - self.update_sync_state() - } - } RequestId::RangeBlockAndBlobs { id } => { - if let Some((chain_id, batch_id)) = self - .network - .range_sync_request_failed(id, ByRangeRequestType::BlocksAndBlobs) - { - self.range_sync.inject_error( - &mut self.network, - peer_id, - batch_id, - chain_id, - id, - ); - self.update_sync_state() + if let Some(sender_id) = self.network.range_request_failed(id) { + match sender_id { + RangeRequestId::RangeSync { chain_id, batch_id } => { + self.range_sync.inject_error( + &mut self.network, + peer_id, + batch_id, + chain_id, + id, + ); + self.update_sync_state(); + } + RangeRequestId::BackfillSync { batch_id } => match self + .backfill_sync + .inject_error(&mut self.network, batch_id, &peer_id, id) + { + Ok(_) => {} + Err(_) => self.update_sync_state(), + }, + } } } } @@ -680,6 +637,7 @@ impl SyncManager { self.handle_unknown_block_root(peer_id, block_root); } SyncMessage::Disconnect(peer_id) => { + debug!(self.log, "Received disconnected message"; "peer_id" => %peer_id); self.peer_disconnect(&peer_id); } SyncMessage::RpcError { @@ -901,49 +859,6 @@ impl SyncManager { RequestId::SingleBlob { .. } => { crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id ); } - RequestId::BackFillBlocks { id } => { - let is_stream_terminator = block.is_none(); - if let Some(batch_id) = self - .network - .backfill_sync_only_blocks_response(id, is_stream_terminator) - { - match self.backfill_sync.on_block_response( - &mut self.network, - batch_id, - &peer_id, - id, - block.map(|b| RpcBlock::new_without_blobs(None, b)), - ) { - Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), - Ok(ProcessResult::Successful) => {} - Err(_error) => { - // The backfill sync has failed, errors are reported - // within. - self.update_sync_state(); - } - } - } - } - RequestId::RangeBlocks { id } => { - let is_stream_terminator = block.is_none(); - if let Some((chain_id, batch_id)) = self - .network - .range_sync_block_only_response(id, is_stream_terminator) - { - self.range_sync.blocks_by_range_response( - &mut self.network, - peer_id, - chain_id, - batch_id, - id, - block.map(|b| RpcBlock::new_without_blobs(None, b)), - ); - self.update_sync_state(); - } - } - RequestId::BackFillBlockAndBlobs { id } => { - self.backfill_block_and_blobs_response(id, peer_id, block.into()) - } RequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, block.into()) } @@ -981,15 +896,6 @@ impl SyncManager { &self.network, ), }, - RequestId::BackFillBlocks { id: _ } => { - crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id ); - } - RequestId::RangeBlocks { id: _ } => { - crit!(self.log, "Blob received during range block request"; "peer_id" => %peer_id ); - } - RequestId::BackFillBlockAndBlobs { id } => { - self.backfill_block_and_blobs_response(id, peer_id, blob.into()) - } RequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, blob.into()) } @@ -1004,9 +910,9 @@ impl SyncManager { peer_id: PeerId, block_or_blob: BlockOrBlob, ) { - if let Some((chain_id, resp)) = self + if let Some(resp) = self .network - .range_sync_block_and_blob_response(id, block_or_blob) + .range_block_and_blob_response(id, block_or_blob) { match resp.responses { Ok(blocks) => { @@ -1016,33 +922,52 @@ impl SyncManager { // chain the stream terminator .chain(vec![None]) { - self.range_sync.blocks_by_range_response( - &mut self.network, - peer_id, - chain_id, - resp.batch_id, - id, - block, - ); - self.update_sync_state(); + match resp.sender_id { + RangeRequestId::RangeSync { chain_id, batch_id } => { + self.range_sync.blocks_by_range_response( + &mut self.network, + peer_id, + chain_id, + batch_id, + id, + block, + ); + self.update_sync_state(); + } + RangeRequestId::BackfillSync { batch_id } => { + match self.backfill_sync.on_block_response( + &mut self.network, + batch_id, + &peer_id, + id, + block, + ) { + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Ok(ProcessResult::Successful) => {} + Err(_error) => { + // The backfill sync has failed, errors are reported + // within. + self.update_sync_state(); + } + } + } + } } } Err(e) => { // Re-insert the request so we can retry - let new_req = BlocksAndBlobsByRangeRequest { - chain_id, - batch_id: resp.batch_id, - block_blob_info: <_>::default(), - }; - self.network - .insert_range_blocks_and_blobs_request(id, new_req); + self.network.insert_range_blocks_and_blobs_request( + id, + resp.sender_id, + BlocksAndBlobsRequestInfo::new(resp.request_type), + ); // inform range that the request needs to be treated as failed // With time we will want to downgrade this log warn!( self.log, "Blocks and blobs request for range received invalid data"; "peer_id" => %peer_id, - "batch_id" => resp.batch_id, + "sender_id" => ?resp.sender_id, "error" => e.clone() ); let id = RequestId::RangeBlockAndBlobs { id }; @@ -1056,69 +981,6 @@ impl SyncManager { } } } - - /// Handles receiving a response for a Backfill sync request that should have both blocks and - /// blobs. - fn backfill_block_and_blobs_response( - &mut self, - id: Id, - peer_id: PeerId, - block_or_blob: BlockOrBlob, - ) { - if let Some(resp) = self - .network - .backfill_sync_block_and_blob_response(id, block_or_blob) - { - match resp.responses { - Ok(blocks) => { - for block in blocks - .into_iter() - .map(Some) - // chain the stream terminator - .chain(vec![None]) - { - match self.backfill_sync.on_block_response( - &mut self.network, - resp.batch_id, - &peer_id, - id, - block, - ) { - Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), - Ok(ProcessResult::Successful) => {} - Err(_error) => { - // The backfill sync has failed, errors are reported - // within. - self.update_sync_state(); - } - } - } - } - Err(e) => { - // Re-insert the request so we can retry - self.network.insert_backfill_blocks_and_blobs_requests( - id, - resp.batch_id, - <_>::default(), - ); - - // inform backfill that the request needs to be treated as failed - // With time we will want to downgrade this log - warn!( - self.log, "Blocks and blobs request for backfill received invalid data"; - "peer_id" => %peer_id, "batch_id" => resp.batch_id, "error" => e.clone() - ); - let id = RequestId::BackFillBlockAndBlobs { id }; - self.network.report_peer( - peer_id, - PeerAction::MidToleranceError, - "block_blob_faulty_backfill_batch", - ); - self.inject_error(peer_id, id, RPCError::InvalidData(e)) - } - } - } - } } impl From>> diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 29effb7bce0..96f8de46fb7 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -21,14 +21,20 @@ use tokio::sync::mpsc; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; pub struct BlocksAndBlobsByRangeResponse { - pub batch_id: BatchId, + pub sender_id: RangeRequestId, pub responses: Result>, String>, + pub request_type: ByRangeRequestType, } -pub struct BlocksAndBlobsByRangeRequest { - pub chain_id: ChainId, - pub batch_id: BatchId, - pub block_blob_info: BlocksAndBlobsRequestInfo, +#[derive(Debug, Clone, Copy)] +pub enum RangeRequestId { + RangeSync { + chain_id: ChainId, + batch_id: BatchId, + }, + BackfillSync { + batch_id: BatchId, + }, } /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. @@ -39,18 +45,9 @@ pub struct SyncNetworkContext { /// A sequential ID for all RPC requests. request_id: Id, - /// BlocksByRange requests made by the range syncing algorithm. - range_requests: FnvHashMap, - - /// BlocksByRange requests made by backfill syncing. - backfill_requests: FnvHashMap, - - /// BlocksByRange requests paired with BlobsByRange requests made by the range. - range_blocks_and_blobs_requests: FnvHashMap>, - - /// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync. - backfill_blocks_and_blobs_requests: - FnvHashMap)>, + /// BlocksByRange requests paired with BlobsByRange + range_blocks_and_blobs_requests: + FnvHashMap)>, /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. @@ -94,10 +91,7 @@ impl SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, - range_requests: FnvHashMap::default(), - backfill_requests: FnvHashMap::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), - backfill_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, log, @@ -148,266 +142,85 @@ impl SyncNetworkContext { peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, - chain_id: ChainId, - batch_id: BatchId, ) -> Result { - match batch_type { - ByRangeRequestType::Blocks => { - trace!( - self.log, - "Sending BlocksByRange request"; - "method" => "BlocksByRange", - "count" => request.count(), - "peer" => %peer_id, - ); - let request = Request::BlocksByRange(request); - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::RangeBlocks { id }); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request, - request_id, - })?; - self.range_requests.insert(id, (chain_id, batch_id)); - Ok(id) - } - ByRangeRequestType::BlocksAndBlobs => { - debug!( - self.log, - "Sending BlocksByRange and BlobsByRange requests"; - "method" => "Mixed by range request", - "count" => request.count(), - "peer" => %peer_id, - ); - - // create the shared request id. This is fine since the rpc handles substream ids. - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }); - - // Create the blob request based on the blob request. - let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { + let id = self.next_id(); + trace!( + self.log, + "Sending BlocksByRange request"; + "method" => "BlocksByRange", + "count" => request.count(), + "peer" => %peer_id, + ); + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: Request::BlocksByRange(request.clone()), + request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + })?; + + if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { + debug!( + self.log, + "Sending BlobsByRange requests"; + "method" => "BlobsByRange", + "count" => request.count(), + "peer" => %peer_id, + ); + + // Create the blob request based on the blocks request. + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: Request::BlobsByRange(BlobsByRangeRequest { start_slot: *request.start_slot(), count: *request.count(), - }); - let blocks_request = Request::BlocksByRange(request); - - // Send both requests. Make sure both can be sent. - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: blocks_request, - request_id, - })?; - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: blobs_request, - request_id, - })?; - let block_blob_info = BlocksAndBlobsRequestInfo::default(); - self.range_blocks_and_blobs_requests.insert( - id, - BlocksAndBlobsByRangeRequest { - chain_id, - batch_id, - block_blob_info, - }, - ); - Ok(id) - } + }), + request_id: RequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + })?; } + + Ok(id) } - /// A blocks by range request sent by the backfill sync algorithm - pub fn backfill_blocks_by_range_request( + /// A blocks by range request sent by the range sync algorithm + pub fn blocks_and_blobs_by_range_request( &mut self, peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, - batch_id: BatchId, + sender_id: RangeRequestId, ) -> Result { - match batch_type { - ByRangeRequestType::Blocks => { - trace!( - self.log, - "Sending backfill BlocksByRange request"; - "method" => "BlocksByRange", - "count" => request.count(), - "peer" => %peer_id, - ); - let request = Request::BlocksByRange(request); - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::BackFillBlocks { id }); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request, - request_id, - })?; - self.backfill_requests.insert(id, batch_id); - Ok(id) - } - ByRangeRequestType::BlocksAndBlobs => { - debug!( - self.log, - "Sending backfill BlocksByRange and BlobsByRange requests"; - "method" => "Mixed by range request", - "count" => request.count(), - "peer" => %peer_id, - ); - - // create the shared request id. This is fine since the rpc handles substream ids. - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::BackFillBlockAndBlobs { id }); - - // Create the blob request based on the blob request. - let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - }); - let blocks_request = Request::BlocksByRange(request); - - // Send both requests. Make sure both can be sent. - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: blocks_request, - request_id, - })?; - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: blobs_request, - request_id, - })?; - let block_blob_info = BlocksAndBlobsRequestInfo::default(); - self.backfill_blocks_and_blobs_requests - .insert(id, (batch_id, block_blob_info)); - Ok(id) - } - } - } - - /// Response for a request that is only for blocks. - pub fn range_sync_block_only_response( - &mut self, - request_id: Id, - is_stream_terminator: bool, - ) -> Option<(ChainId, BatchId)> { - if is_stream_terminator { - self.range_requests.remove(&request_id) - } else { - self.range_requests.get(&request_id).copied() - } - } - - /// Received a blocks by range response for a request that couples blocks and blobs. - pub fn range_sync_block_and_blob_response( - &mut self, - request_id: Id, - block_or_blob: BlockOrBlob, - ) -> Option<(ChainId, BlocksAndBlobsByRangeResponse)> { - match self.range_blocks_and_blobs_requests.entry(request_id) { - Entry::Occupied(mut entry) => { - let req = entry.get_mut(); - let info = &mut req.block_blob_info; - match block_or_blob { - BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), - } - if info.is_finished() { - // If the request is finished, dequeue everything - let BlocksAndBlobsByRangeRequest { - chain_id, - batch_id, - block_blob_info, - } = entry.remove(); - Some(( - chain_id, - BlocksAndBlobsByRangeResponse { - batch_id, - responses: block_blob_info.into_responses(), - }, - )) - } else { - None - } - } - Entry::Vacant(_) => None, - } + let id = self.blocks_by_range_request(peer_id, batch_type, request)?; + self.range_blocks_and_blobs_requests + .insert(id, (sender_id, BlocksAndBlobsRequestInfo::new(batch_type))); + Ok(id) } - pub fn range_sync_request_failed( - &mut self, - request_id: Id, - batch_type: ByRangeRequestType, - ) -> Option<(ChainId, BatchId)> { - let req = match batch_type { - ByRangeRequestType::BlocksAndBlobs => self - .range_blocks_and_blobs_requests - .remove(&request_id) - .map(|req| (req.chain_id, req.batch_id)), - ByRangeRequestType::Blocks => self.range_requests.remove(&request_id), - }; - if let Some(req) = req { + pub fn range_request_failed(&mut self, request_id: Id) -> Option { + let sender_id = self + .range_blocks_and_blobs_requests + .remove(&request_id) + .map(|(sender_id, _info)| sender_id); + if let Some(sender_id) = sender_id { debug!( self.log, - "Range sync request failed"; + "Sync range request failed"; "request_id" => request_id, - "batch_type" => ?batch_type, - "chain_id" => ?req.0, - "batch_id" => ?req.1 + "sender_id" => ?sender_id ); - Some(req) + Some(sender_id) } else { - debug!(self.log, "Range sync request failed"; "request_id" => request_id, "batch_type" => ?batch_type); + debug!(self.log, "Sync range request failed"; "request_id" => request_id); None } } - pub fn backfill_request_failed( - &mut self, - request_id: Id, - batch_type: ByRangeRequestType, - ) -> Option { - let batch_id = match batch_type { - ByRangeRequestType::BlocksAndBlobs => self - .backfill_blocks_and_blobs_requests - .remove(&request_id) - .map(|(batch_id, _info)| batch_id), - ByRangeRequestType::Blocks => self.backfill_requests.remove(&request_id), - }; - if let Some(batch_id) = batch_id { - debug!( - self.log, - "Backfill sync request failed"; - "request_id" => request_id, - "batch_type" => ?batch_type, - "batch_id" => ?batch_id - ); - Some(batch_id) - } else { - debug!(self.log, "Backfill sync request failed"; "request_id" => request_id, "batch_type" => ?batch_type); - None - } - } - - /// Response for a request that is only for blocks. - pub fn backfill_sync_only_blocks_response( - &mut self, - request_id: Id, - is_stream_terminator: bool, - ) -> Option { - if is_stream_terminator { - self.backfill_requests.remove(&request_id) - } else { - self.backfill_requests.get(&request_id).copied() - } - } - /// Received a blocks by range or blobs by range response for a request that couples blocks ' /// and blobs. - pub fn backfill_sync_block_and_blob_response( + pub fn range_block_and_blob_response( &mut self, request_id: Id, block_or_blob: BlockOrBlob, ) -> Option> { - match self.backfill_blocks_and_blobs_requests.entry(request_id) { + match self.range_blocks_and_blobs_requests.entry(request_id) { Entry::Occupied(mut entry) => { let (_, info) = entry.get_mut(); match block_or_blob { @@ -416,12 +229,12 @@ impl SyncNetworkContext { } if info.is_finished() { // If the request is finished, dequeue everything - let (batch_id, info) = entry.remove(); - - let responses = info.into_responses(); + let (sender_id, info) = entry.remove(); + let request_type = info.get_request_type(); Some(BlocksAndBlobsByRangeResponse { - batch_id, - responses, + sender_id, + request_type, + responses: info.into_responses(), }) } else { None @@ -586,18 +399,10 @@ impl SyncNetworkContext { pub fn insert_range_blocks_and_blobs_request( &mut self, id: Id, - request: BlocksAndBlobsByRangeRequest, - ) { - self.range_blocks_and_blobs_requests.insert(id, request); - } - - pub fn insert_backfill_blocks_and_blobs_requests( - &mut self, - id: Id, - batch_id: BatchId, - request: BlocksAndBlobsRequestInfo, + sender_id: RangeRequestId, + info: BlocksAndBlobsRequestInfo, ) { - self.backfill_blocks_and_blobs_requests - .insert(id, (batch_id, request)); + self.range_blocks_and_blobs_requests + .insert(id, (sender_id, info)); } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 5a77340e3b5..c60cdb2cc9f 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,5 +1,6 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::network_context::RangeRequestId; use crate::sync::{ manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult, }; @@ -905,7 +906,15 @@ impl SyncingChain { ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, batch_type) = batch.to_blocks_by_range_request(); - match network.blocks_by_range_request(peer, batch_type, request, self.id, batch_id) { + match network.blocks_and_blobs_by_range_request( + peer, + batch_type, + request, + RangeRequestId::RangeSync { + chain_id: self.id, + batch_id, + }, + ) { Ok(request_id) => { // inform the batch about the new request batch.start_downloading_from_peer(peer, request_id)?; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index a159eb45410..c8e82666840 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -384,7 +384,7 @@ mod tests { use crate::NetworkMessage; use super::*; - use crate::sync::network_context::BlockOrBlob; + use crate::sync::network_context::{BlockOrBlob, RangeRequestId}; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::parking_lot::RwLock; @@ -548,6 +548,51 @@ mod tests { (block_req_id, blob_req_id) } + fn complete_range_block_and_blobs_response( + &mut self, + block_req: RequestId, + blob_req_opt: Option, + ) -> (ChainId, BatchId, Id) { + if blob_req_opt.is_some() { + match block_req { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { + let _ = self + .cx + .range_block_and_blob_response(id, BlockOrBlob::Block(None)); + let response = self + .cx + .range_block_and_blob_response(id, BlockOrBlob::Blob(None)) + .unwrap(); + let (chain_id, batch_id) = + TestRig::unwrap_range_request_id(response.sender_id); + (chain_id, batch_id, id) + } + other => panic!("unexpected request {:?}", other), + } + } else { + match block_req { + RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { + let response = self + .cx + .range_block_and_blob_response(id, BlockOrBlob::Block(None)) + .unwrap(); + let (chain_id, batch_id) = + TestRig::unwrap_range_request_id(response.sender_id); + (chain_id, batch_id, id) + } + other => panic!("unexpected request {:?}", other), + } + } + } + + fn unwrap_range_request_id(sender_id: RangeRequestId) -> (ChainId, BatchId) { + if let RangeRequestId::RangeSync { chain_id, batch_id } = sender_id { + (chain_id, batch_id) + } else { + panic!("expected RangeSync request: {:?}", sender_id) + } + } + /// Produce a head peer fn head_peer( &self, @@ -744,29 +789,8 @@ mod tests { range.add_peer(&mut rig.cx, local_info, peer1, head_info); let (block_req, blob_req_opt) = rig.grab_request(&peer1, fork); - let (chain1, batch1, id1) = if blob_req_opt.is_some() { - match block_req { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { - let _ = rig - .cx - .range_sync_block_and_blob_response(id, BlockOrBlob::Block(None)); - let (chain1, response) = rig - .cx - .range_sync_block_and_blob_response(id, BlockOrBlob::Blob(None)) - .unwrap(); - (chain1, response.batch_id, id) - } - other => panic!("unexpected request {:?}", other), - } - } else { - match block_req { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { - let (chain, batch) = rig.cx.range_sync_block_only_response(id, true).unwrap(); - (chain, batch, id) - } - other => panic!("unexpected request {:?}", other), - } - }; + let (chain1, batch1, id1) = + rig.complete_range_block_and_blobs_response(block_req, blob_req_opt); // make the ee offline rig.cx.update_execution_engine_state(EngineState::Offline); @@ -782,29 +806,8 @@ mod tests { range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); let (block_req, blob_req_opt) = rig.grab_request(&peer2, fork); - let (chain2, batch2, id2) = if blob_req_opt.is_some() { - match block_req { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlockAndBlobs { id }) => { - let _ = rig - .cx - .range_sync_block_and_blob_response(id, BlockOrBlob::Block(None)); - let (chain2, response) = rig - .cx - .range_sync_block_and_blob_response(id, BlockOrBlob::Blob(None)) - .unwrap(); - (chain2, response.batch_id, id) - } - other => panic!("unexpected request {:?}", other), - } - } else { - match block_req { - RequestId::Sync(crate::sync::manager::RequestId::RangeBlocks { id }) => { - let (chain, batch) = rig.cx.range_sync_block_only_response(id, true).unwrap(); - (chain, batch, id) - } - other => panic!("unexpected request {:?}", other), - } - }; + let (chain2, batch2, id2) = + rig.complete_range_block_and_blobs_response(block_req, blob_req_opt); // send the response to the request range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, None); diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 5a27b148c99..fd2cf473cb3 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -348,8 +348,8 @@ pub fn get_config( } // Set config values from parse values. - el_config.secret_files = vec![secret_file.clone()]; - el_config.execution_endpoints = vec![execution_endpoint.clone()]; + el_config.secret_file = Some(secret_file.clone()); + el_config.execution_endpoint = Some(execution_endpoint.clone()); el_config.suggested_fee_recipient = clap_utils::parse_optional(cli_args, "suggested-fee-recipient")?; el_config.jwt_id = clap_utils::parse_optional(cli_args, "execution-jwt-id")?; diff --git a/book/src/help_vc.md b/book/src/help_vc.md index fb963f87cc5..1b7e7f2b0af 100644 --- a/book/src/help_vc.md +++ b/book/src/help_vc.md @@ -218,7 +218,7 @@ OPTIONS: The directory which contains the validator keystores, deposit data for each validator along with the common slashing protection database and the validator_definitions.yml --web3-signer-keep-alive-timeout - Keep-alive timeout for each web3signer connection. Set to 'null' to never timeout [default: 90000] + Keep-alive timeout for each web3signer connection. Set to 'null' to never timeout [default: 20000] --web3-signer-max-idle-connections Maximum number of idle connections to maintain per web3signer host. Default is unlimited. diff --git a/book/src/validator-management.md b/book/src/validator-management.md index df7c2ac4760..bc6aba3c4f9 100644 --- a/book/src/validator-management.md +++ b/book/src/validator-management.md @@ -59,7 +59,9 @@ Each permitted field of the file is listed below for reference: - `voting_keystore_password`: The password to the EIP-2335 keystore. > **Note**: Either `voting_keystore_password_path` or `voting_keystore_password` *must* be -> supplied. If both are supplied, `voting_keystore_password_path` is ignored. +> supplied. If both are supplied, `voting_keystore_password_path` is ignored. + +>If you do not wish to have `voting_keystore_password` being stored in the `validator_definitions.yml` file, you can add the field `voting_keystore_password_path` and point it to a file containing the password. The file can be, e.g., on a mounted portable drive that contains the password so that no password is stored on the validating node. ## Populating the `validator_definitions.yml` file @@ -75,6 +77,7 @@ recap: ### Automatic validator discovery + When the `--disable-auto-discover` flag is **not** provided, the validator client will search the `validator-dir` for validators and add any *new* validators to the `validator_definitions.yml` with `enabled: true`. diff --git a/consensus/state_processing/src/common/initiate_validator_exit.rs b/consensus/state_processing/src/common/initiate_validator_exit.rs index c527807df89..84656d9c890 100644 --- a/consensus/state_processing/src/common/initiate_validator_exit.rs +++ b/consensus/state_processing/src/common/initiate_validator_exit.rs @@ -26,7 +26,7 @@ pub fn initiate_validator_exit( .map_or(delayed_epoch, |epoch| max(epoch, delayed_epoch)); let exit_queue_churn = state.exit_cache().get_churn_at(exit_queue_epoch)?; - if exit_queue_churn >= state.get_churn_limit(spec)? { + if exit_queue_churn >= state.get_validator_churn_limit(spec)? { exit_queue_epoch.safe_add_assign(1)?; } diff --git a/consensus/state_processing/src/genesis.rs b/consensus/state_processing/src/genesis.rs index b225923b418..036ab23498c 100644 --- a/consensus/state_processing/src/genesis.rs +++ b/consensus/state_processing/src/genesis.rs @@ -1,5 +1,5 @@ use super::per_block_processing::{ - errors::BlockProcessingError, process_operations::process_deposit, + errors::BlockProcessingError, process_operations::apply_deposit, }; use crate::common::DepositDataTree; use crate::upgrade::{ @@ -37,7 +37,7 @@ pub fn initialize_beacon_state_from_eth1( .push_leaf(deposit.data.tree_hash_root()) .map_err(BlockProcessingError::MerkleTreeError)?; state.eth1_data_mut().deposit_root = deposit_tree.root(); - process_deposit(&mut state, deposit, spec, true)?; + apply_deposit(&mut state, deposit, spec, true)?; } process_activations(&mut state, spec)?; diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index af9b7938132..63b7c9e01fb 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -371,14 +371,14 @@ pub fn process_deposits( // Update the state in series. for deposit in deposits { - process_deposit(state, deposit, spec, false)?; + apply_deposit(state, deposit, spec, false)?; } Ok(()) } /// Process a single deposit, optionally verifying its merkle proof. -pub fn process_deposit( +pub fn apply_deposit( state: &mut BeaconState, deposit: &Deposit, spec: &ChainSpec, diff --git a/consensus/state_processing/src/per_epoch_processing/single_pass.rs b/consensus/state_processing/src/per_epoch_processing/single_pass.rs index 9319d2941b5..380484046c3 100644 --- a/consensus/state_processing/src/per_epoch_processing/single_pass.rs +++ b/consensus/state_processing/src/per_epoch_processing/single_pass.rs @@ -120,7 +120,7 @@ pub fn process_epoch_single_pass( let next_epoch = state.next_epoch()?; let is_in_inactivity_leak = state.is_in_inactivity_leak(previous_epoch, spec)?; let total_active_balance = state.get_total_active_balance()?; - let churn_limit = state.get_churn_limit(spec)?; + let churn_limit = state.get_validator_churn_limit(spec)?; let activation_churn_limit = state.get_activation_churn_limit(spec)?; let finalized_checkpoint = state.finalized_checkpoint(); let fork_name = state.fork_name_unchecked(); diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index ba11c9c4cce..02572b0efbd 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1444,7 +1444,7 @@ impl BeaconState { /// Return the churn limit for the current epoch (number of validators who can leave per epoch). /// /// Uses the current epoch committee cache, and will error if it isn't initialized. - pub fn get_churn_limit(&self, spec: &ChainSpec) -> Result { + pub fn get_validator_churn_limit(&self, spec: &ChainSpec) -> Result { Ok(std::cmp::max( spec.min_per_epoch_churn_limit, (self @@ -1462,10 +1462,10 @@ impl BeaconState { BeaconState::Base(_) | BeaconState::Altair(_) | BeaconState::Merge(_) - | BeaconState::Capella(_) => self.get_churn_limit(spec)?, + | BeaconState::Capella(_) => self.get_validator_churn_limit(spec)?, BeaconState::Deneb(_) | BeaconState::Electra(_) => std::cmp::min( spec.max_per_epoch_activation_churn_limit, - self.get_churn_limit(spec)?, + self.get_validator_churn_limit(spec)?, ), }) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 4a50126945b..ec10ff4429d 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -445,13 +445,16 @@ fn run_merge_execution_endpoints_flag_test(flag: &str) { .run_with_zero_port() .with_config(|config| { let config = config.execution_layer.as_ref().unwrap(); - assert_eq!(config.execution_endpoints.len(), 1); + assert_eq!(config.execution_endpoint.is_some(), true); assert_eq!( - config.execution_endpoints[0], + config.execution_endpoint.as_ref().unwrap().clone(), SensitiveUrl::parse(&urls[0]).unwrap() ); // Only the first secret file should be used. - assert_eq!(config.secret_files, vec![jwts[0].clone()]); + assert_eq!( + config.secret_file.as_ref().unwrap().clone(), + jwts[0].clone() + ); }); } #[test] @@ -464,11 +467,11 @@ fn run_execution_jwt_secret_key_is_persisted() { .with_config(|config| { let config = config.execution_layer.as_ref().unwrap(); assert_eq!( - config.execution_endpoints[0].full.to_string(), + config.execution_endpoint.as_ref().unwrap().full.to_string(), "http://localhost:8551/" ); let mut file_jwt_secret_key = String::new(); - File::open(config.secret_files[0].clone()) + File::open(config.secret_file.as_ref().unwrap()) .expect("could not open jwt_secret_key file") .read_to_string(&mut file_jwt_secret_key) .expect("could not read from file"); @@ -515,10 +518,13 @@ fn merge_jwt_secrets_flag() { .with_config(|config| { let config = config.execution_layer.as_ref().unwrap(); assert_eq!( - config.execution_endpoints[0].full.to_string(), + config.execution_endpoint.as_ref().unwrap().full.to_string(), "http://localhost:8551/" ); - assert_eq!(config.secret_files[0], dir.path().join("jwt-file")); + assert_eq!( + config.secret_file.as_ref().unwrap().clone(), + dir.path().join("jwt-file") + ); }); } #[test] diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index 764fd87ccdf..cdf8fa15aaa 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -1,4 +1,4 @@ -use validator_client::{ApiTopic, Config}; +use validator_client::{config::DEFAULT_WEB3SIGNER_KEEP_ALIVE, ApiTopic, Config}; use crate::exec::CommandLineTestExec; use bls::{Keypair, PublicKeyBytes}; @@ -9,6 +9,7 @@ use std::path::PathBuf; use std::process::Command; use std::str::FromStr; use std::string::ToString; +use std::time::Duration; use tempfile::TempDir; use types::Address; @@ -653,3 +654,26 @@ fn validator_disable_web3_signer_slashing_protection() { assert!(!config.enable_web3signer_slashing_protection); }); } + +#[test] +fn validator_web3_signer_keep_alive_default() { + CommandLineTest::new().run().with_config(|config| { + assert_eq!( + config.web3_signer_keep_alive_timeout, + DEFAULT_WEB3SIGNER_KEEP_ALIVE + ); + }); +} + +#[test] +fn validator_web3_signer_keep_alive_override() { + CommandLineTest::new() + .flag("web3-signer-keep-alive-timeout", Some("1000")) + .run() + .with_config(|config| { + assert_eq!( + config.web3_signer_keep_alive_timeout, + Some(Duration::from_secs(1)) + ); + }); +} diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 8f782c7e4e0..0103f7074b5 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -121,11 +121,11 @@ impl TestRig { let ee_a = { let execution_engine = ExecutionEngine::new(generic_engine.clone()); - let urls = vec![execution_engine.http_auth_url()]; + let url = Some(execution_engine.http_auth_url()); let config = execution_layer::Config { - execution_endpoints: urls, - secret_files: vec![], + execution_endpoint: url, + secret_file: None, suggested_fee_recipient: Some(Address::repeat_byte(42)), default_datadir: execution_engine.datadir(), ..Default::default() @@ -140,11 +140,11 @@ impl TestRig { let ee_b = { let execution_engine = ExecutionEngine::new(generic_engine); - let urls = vec![execution_engine.http_auth_url()]; + let url = Some(execution_engine.http_auth_url()); let config = execution_layer::Config { - execution_endpoints: urls, - secret_files: vec![], + execution_endpoint: url, + secret_file: None, suggested_fee_recipient: fee_recipient, default_datadir: execution_engine.datadir(), ..Default::default() diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 16a265212e5..991b621f27d 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -73,7 +73,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { key. Defaults to ~/.lighthouse/{network}/secrets.", ) .takes_value(true) - .conflicts_with("datadir") ) .arg( Arg::with_name("init-slashing-protection") @@ -391,7 +390,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("web3-signer-keep-alive-timeout") .long("web3-signer-keep-alive-timeout") .value_name("MILLIS") - .default_value("90000") + .default_value("20000") .help("Keep-alive timeout for each web3signer connection. Set to 'null' to never \ timeout") .takes_value(true), diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index ae59829a3e6..5bd32fced2a 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -18,6 +18,7 @@ use std::time::Duration; use types::{Address, GRAFFITI_BYTES_LEN}; pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/"; +pub const DEFAULT_WEB3SIGNER_KEEP_ALIVE: Option = Some(Duration::from_secs(20)); /// Stores the core configuration for this validator instance. #[derive(Clone, Serialize, Deserialize)] @@ -133,7 +134,7 @@ impl Default for Config { builder_boost_factor: None, prefer_builder_proposals: false, distributed: false, - web3_signer_keep_alive_timeout: Some(Duration::from_secs(90)), + web3_signer_keep_alive_timeout: DEFAULT_WEB3SIGNER_KEEP_ALIVE, web3_signer_max_idle_connections: None, } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 32a0eadbef4..268c25cdf7d 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -3,7 +3,6 @@ mod beacon_node_fallback; mod block_service; mod check_synced; mod cli; -mod config; mod duties_service; mod graffiti_file; mod http_metrics; @@ -14,6 +13,7 @@ mod preparation_service; mod signing_method; mod sync_committee_service; +pub mod config; mod doppelganger_service; pub mod http_api; pub mod initialized_validators;