diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 81c8f662ee9..fb813723920 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -134,13 +134,37 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, request: BlocksByRootRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.clone() + .handle_blocks_by_root_request_inner(executor, peer_id, request_id, request) + .await, + Response::BlocksByRoot, + ); + } + + /// Handle a `BlocksByRoot` request from the peer. + pub async fn handle_blocks_by_root_request_inner( + self: Arc, + executor: TaskExecutor, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRootRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { let requested_blocks = request.block_roots().len(); let mut block_stream = match self .chain .get_blocks_checking_caches(request.block_roots().to_vec(), &executor) { Ok(block_stream) => block_stream, - Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + Err(e) => { + error!(self.log, "Error getting block stream"; "error" => ?e); + return Err(( + RPCResponseErrorCode::ServerError, + "Error getting block stream", + )); + } }; // Fetching blocks is async because it may have to hit the execution layer for payloads. let mut send_block_count = 0; @@ -169,13 +193,10 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); - // send the stream terminator - return self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::ResourceUnavailable, - "Execution layer not synced".into(), - request_id, - ); + "Execution layer not synced", + )); } Err(e) => { debug!( @@ -196,8 +217,7 @@ impl NetworkBeaconProcessor { "returned" => %send_block_count ); - // send stream termination - self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + Ok(()) } /// Handle a `BlobsByRoot` request from the peer. @@ -380,6 +400,24 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, req: BlocksByRangeRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.clone() + .handle_blocks_by_range_request_inner(executor, peer_id, request_id, req) + .await, + Response::BlocksByRange, + ); + } + + /// Handle a `BlocksByRange` request from the peer. + pub async fn handle_blocks_by_range_request_inner( + self: Arc, + executor: TaskExecutor, + peer_id: PeerId, + request_id: PeerRequestId, + req: BlocksByRangeRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, "count" => req.count(), @@ -401,12 +439,10 @@ impl NetworkBeaconProcessor { } }); if *req.count() > max_request_size { - return self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::InvalidRequest, - format!("Request exceeded max size {max_request_size}"), - request_id, - ); + "Request exceeded max size", + )); } let forwards_block_root_iter = match self @@ -424,25 +460,15 @@ impl NetworkBeaconProcessor { "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling")); } Err(e) => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Database error".into(), - request_id, - ); - return error!(self.log, "Unable to obtain root iter"; + error!(self.log, "Unable to obtain root iter"; "request" => ?req, "peer" => %peer_id, "error" => ?e ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); } }; @@ -468,11 +494,12 @@ impl NetworkBeaconProcessor { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, Err(e) => { - return error!(self.log, "Error during iteration over blocks"; + error!(self.log, "Error during iteration over blocks"; "request" => ?req, "peer" => %peer_id, "error" => ?e - ) + ); + return Err((RPCResponseErrorCode::ServerError, "Iteration error")); } }; @@ -481,7 +508,10 @@ impl NetworkBeaconProcessor { let mut block_stream = match self.chain.get_blocks(block_roots, &executor) { Ok(block_stream) => block_stream, - Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + Err(e) => { + error!(self.log, "Error getting block stream"; "error" => ?e); + return Err((RPCResponseErrorCode::ServerError, "Iterator error")); + } }; // Fetching blocks is async because it may have to hit the execution layer for payloads. @@ -511,12 +541,7 @@ impl NetworkBeaconProcessor { "peer" => %peer_id, "request_root" => ?root ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Database inconsistency".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ServerError, "Database inconsistency")); } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { debug!( @@ -526,12 +551,10 @@ impl NetworkBeaconProcessor { "reason" => "execution layer not synced", ); // send the stream terminator - return self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::ResourceUnavailable, - "Execution layer not synced".into(), - request_id, - ); + "Execution layer not synced", + )); } Err(e) => { if matches!( @@ -556,12 +579,7 @@ impl NetworkBeaconProcessor { } // send the stream terminator - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Failed fetching blocks".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ServerError, "Failed fetching blocks")); } } } @@ -594,12 +612,7 @@ impl NetworkBeaconProcessor { ); } - // send the stream terminator - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlocksByRange(None), - id: request_id, - }); + Ok(()) } /// Handle a `BlobsByRange` request from the peer.