diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 1b4b93adb0c..2a00484803a 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -212,6 +212,7 @@ impl NetworkBeaconProcessor { "load_blocks_by_root_blocks", ) } + /// Handle a `BlobsByRoot` request from the peer. pub fn handle_blobs_by_root_request( self: Arc, @@ -219,10 +220,25 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, request: BlobsByRootRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.handle_blobs_by_root_request_inner(peer_id, request_id, request), + Response::BlobsByRoot, + ); + } + + /// Handle a `BlobsByRoot` request from the peer. + pub fn handle_blobs_by_root_request_inner( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlobsByRootRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root) else { // No blob ids requested. - return; + return Ok(()); }; let requested_indices = request .blob_ids @@ -231,7 +247,6 @@ impl NetworkBeaconProcessor { .map(|id| id.index) .collect::>(); let mut send_blob_count = 0; - let send_response = true; let mut blob_list_results = HashMap::new(); for id in request.blob_ids.as_slice() { @@ -287,10 +302,7 @@ impl NetworkBeaconProcessor { "returned" => send_blob_count ); - // send stream termination - if send_response { - self.send_response(peer_id, Response::BlobsByRoot(None), request_id); - } + Ok(()) } /// Handle a `LightClientBootstrap` request from the peer. @@ -300,33 +312,29 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, request: LightClientBootstrapRequest, ) { - let block_root = request.root; - match self.chain.get_light_client_bootstrap(&block_root) { - Ok(Some((bootstrap, _))) => self.send_response( - peer_id, - Response::LightClientBootstrap(Arc::new(bootstrap)), - request_id, - ), - Ok(None) => self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not available".into(), - request_id, - ), - Err(e) => { - self.send_error_response( - peer_id, + self.terminate_response_single_item( + peer_id, + request_id, + match self.chain.get_light_client_bootstrap(&request.root) { + Ok(Some((bootstrap, _))) => Ok(Arc::new(bootstrap)), + Ok(None) => Err(( RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not available".into(), - request_id, - ); - error!(self.log, "Error getting LightClientBootstrap instance"; - "block_root" => ?block_root, - "peer" => %peer_id, - "error" => ?e - ) - } - }; + "Bootstrap not available", + )), + Err(e) => { + error!(self.log, "Error getting LightClientBootstrap instance"; + "block_root" => ?request.root, + "peer" => %peer_id, + "error" => ?e + ); + Err(( + RPCResponseErrorCode::ResourceUnavailable, + "Bootstrap not available", + )) + } + }, + Response::LightClientBootstrap, + ); } /// Handle a `LightClientOptimisticUpdate` request from the peer. @@ -335,25 +343,22 @@ impl NetworkBeaconProcessor { peer_id: PeerId, request_id: PeerRequestId, ) { - let Some(light_client_optimistic_update) = self - .chain - .light_client_server_cache - .get_latest_optimistic_update() - else { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Latest optimistic update not available".into(), - request_id, - ); - return; - }; - - self.send_response( + self.terminate_response_single_item( peer_id, - Response::LightClientOptimisticUpdate(Arc::new(light_client_optimistic_update)), request_id, - ) + match self + .chain + .light_client_server_cache + .get_latest_optimistic_update() + { + Some(update) => Ok(Arc::new(update)), + None => Err(( + RPCResponseErrorCode::ResourceUnavailable, + "Latest optimistic update not available", + )), + }, + Response::LightClientOptimisticUpdate, + ); } /// Handle a `LightClientFinalityUpdate` request from the peer. @@ -362,25 +367,22 @@ impl NetworkBeaconProcessor { peer_id: PeerId, request_id: PeerRequestId, ) { - let Some(light_client_finality_update) = self - .chain - .light_client_server_cache - .get_latest_finality_update() - else { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Latest finality update not available".into(), - request_id, - ); - return; - }; - - self.send_response( + self.terminate_response_single_item( peer_id, - Response::LightClientFinalityUpdate(Arc::new(light_client_finality_update)), request_id, - ) + match self + .chain + .light_client_server_cache + .get_latest_finality_update() + { + Some(update) => Ok(Arc::new(update)), + None => Err(( + RPCResponseErrorCode::ResourceUnavailable, + "Latest finality update not available", + )), + }, + Response::LightClientFinalityUpdate, + ); } /// Handle a `BlocksByRange` request from the peer. @@ -637,6 +639,21 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, req: BlobsByRangeRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.handle_blobs_by_range_request_inner(peer_id, request_id, req), + Response::BlobsByRange, + ); + } + + /// Handle a `BlobsByRange` request from the peer. + fn handle_blobs_by_range_request_inner( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + req: BlobsByRangeRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { debug!(self.log, "Received BlobsByRange Request"; "peer_id" => %peer_id, "count" => req.count, @@ -645,12 +662,10 @@ impl NetworkBeaconProcessor { // Should not send more than max request blocks if req.max_blobs_requested::() > self.chain.spec.max_request_blob_sidecars { - return self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::InvalidRequest, - "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(), - request_id, - ); + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", + )); } let request_start_slot = Slot::from(req.start_slot); @@ -659,13 +674,10 @@ impl NetworkBeaconProcessor { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), None => { debug!(self.log, "Deneb fork is disabled"); - self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::InvalidRequest, - "Deneb fork is disabled".into(), - request_id, - ); - return; + "Deneb fork is disabled", + )); } }; @@ -685,19 +697,15 @@ impl NetworkBeaconProcessor { ); return if data_availability_boundary_slot < oldest_blob_slot { - self.send_error_response( - peer_id, + Err(( RPCResponseErrorCode::ResourceUnavailable, - "blobs pruned within boundary".into(), - request_id, - ) + "blobs pruned within boundary", + )) } else { - self.send_error_response( - peer_id, + Err(( RPCResponseErrorCode::InvalidRequest, - "Req outside availability period".into(), - request_id, - ) + "Req outside availability period", + )) }; } @@ -714,25 +722,15 @@ impl NetworkBeaconProcessor { "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling")); } Err(e) => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Database error".into(), - request_id, - ); - return error!(self.log, "Unable to obtain root iter"; + error!(self.log, "Unable to obtain root iter"; "request" => ?req, "peer" => %peer_id, "error" => ?e ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); } }; @@ -764,11 +762,12 @@ impl NetworkBeaconProcessor { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, Err(e) => { - return error!(self.log, "Error during iteration over blocks"; + error!(self.log, "Error during iteration over blocks"; "request" => ?req, "peer" => %peer_id, "error" => ?e - ) + ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); } }; @@ -776,7 +775,6 @@ impl NetworkBeaconProcessor { let block_roots = block_roots.into_iter().flatten(); let mut blobs_sent = 0; - let mut send_response = true; for root in block_roots { match self.chain.get_blobs(&root) { @@ -799,14 +797,10 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "error" => ?e ); - self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::ServerError, - "No blobs and failed fetching corresponding block".into(), - request_id, - ); - send_response = false; - break; + "No blobs and failed fetching corresponding block", + )); } } } @@ -826,13 +820,53 @@ impl NetworkBeaconProcessor { "returned" => blobs_sent ); - if send_response { - // send the stream terminator - self.send_network_message(NetworkMessage::SendResponse { + Ok(()) + } + + /// Helper function to ensure single item protocol always end with either a single chunk or an + /// error + fn terminate_response_single_item Response>( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + result: Result, + into_response: F, + ) { + match result { + Ok(resp) => { + // Not necessary to explicitly send a termination message if this InboundRequest + // returns <= 1 for InboundRequest::expected_responses + // https://github.com/sigp/lighthouse/blob/3058b96f2560f1da04ada4f9d8ba8e5651794ff6/beacon_node/lighthouse_network/src/rpc/handler.rs#L555-L558 + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: into_response(resp), + id: request_id, + }); + } + Err((error_code, reason)) => { + self.send_error_response(peer_id, error_code, reason.into(), request_id); + } + } + } + + /// Helper function to ensure streamed protocols with multiple responses always end with either + /// a stream termination or an error + fn terminate_response_stream) -> Response>( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + result: Result<(), (RPCResponseErrorCode, &'static str)>, + into_response: F, + ) { + match result { + Ok(_) => self.send_network_message(NetworkMessage::SendResponse { peer_id, - response: Response::BlobsByRange(None), + response: into_response(None), id: request_id, - }); + }), + Err((error_code, reason)) => { + self.send_error_response(peer_id, error_code, reason.into(), request_id); + } } } }