Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure proper ReqResp blocks_by_* response stream termination #5582

Merged
merged 2 commits into from
Apr 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 66 additions & 53 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,37 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request_id: PeerRequestId,
request: BlocksByRootRequest,
) {
self.terminate_response_stream(
peer_id,
request_id,
self.clone()
.handle_blocks_by_root_request_inner(executor, peer_id, request_id, request)
.await,
Response::BlocksByRoot,
);
}

/// Handle a `BlocksByRoot` request from the peer.
pub async fn handle_blocks_by_root_request_inner(
self: Arc<Self>,
executor: TaskExecutor,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlocksByRootRequest,
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
let requested_blocks = request.block_roots().len();
let mut block_stream = match self
.chain
.get_blocks_checking_caches(request.block_roots().to_vec(), &executor)
{
Ok(block_stream) => block_stream,
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
Err(e) => {
error!(self.log, "Error getting block stream"; "error" => ?e);
return Err((
RPCResponseErrorCode::ServerError,
"Error getting block stream",
));
}
};
// Fetching blocks is async because it may have to hit the execution layer for payloads.
let mut send_block_count = 0;
Expand Down Expand Up @@ -169,13 +193,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => ?root,
"reason" => "execution layer not synced",
);
// send the stream terminator
return self.send_error_response(
peer_id,
return Err((
RPCResponseErrorCode::ResourceUnavailable,
"Execution layer not synced".into(),
request_id,
);
"Execution layer not synced",
));
}
Err(e) => {
debug!(
Expand All @@ -196,8 +217,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"returned" => %send_block_count
);

// send stream termination
self.send_response(peer_id, Response::BlocksByRoot(None), request_id);
Ok(())
}

/// Handle a `BlobsByRoot` request from the peer.
Expand Down Expand Up @@ -380,6 +400,24 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request_id: PeerRequestId,
req: BlocksByRangeRequest,
) {
self.terminate_response_stream(
peer_id,
request_id,
self.clone()
.handle_blocks_by_range_request_inner(executor, peer_id, request_id, req)
.await,
Response::BlocksByRange,
);
}

/// Handle a `BlocksByRange` request from the peer.
pub async fn handle_blocks_by_range_request_inner(
self: Arc<Self>,
executor: TaskExecutor,
peer_id: PeerId,
request_id: PeerRequestId,
req: BlocksByRangeRequest,
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
debug!(self.log, "Received BlocksByRange Request";
"peer_id" => %peer_id,
"count" => req.count(),
Expand All @@ -401,12 +439,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
});
if *req.count() > max_request_size {
return self.send_error_response(
peer_id,
return Err((
RPCResponseErrorCode::InvalidRequest,
format!("Request exceeded max size {max_request_size}"),
request_id,
);
"Request exceeded max size",
));
}

let forwards_block_root_iter = match self
Expand All @@ -424,25 +460,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
request_id,
);
return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database error".into(),
request_id,
);
return error!(self.log, "Unable to obtain root iter";
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RPCResponseErrorCode::ServerError, "Database error"));
}
};

Expand All @@ -468,11 +494,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
return error!(self.log, "Error during iteration over blocks";
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
)
);
return Err((RPCResponseErrorCode::ServerError, "Iteration error"));
}
};

Expand All @@ -481,7 +508,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

let mut block_stream = match self.chain.get_blocks(block_roots, &executor) {
Ok(block_stream) => block_stream,
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
Err(e) => {
error!(self.log, "Error getting block stream"; "error" => ?e);
return Err((RPCResponseErrorCode::ServerError, "Iterator error"));
}
};

// Fetching blocks is async because it may have to hit the execution layer for payloads.
Expand Down Expand Up @@ -511,12 +541,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"peer" => %peer_id,
"request_root" => ?root
);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database inconsistency".into(),
request_id,
);
return Err((RPCResponseErrorCode::ServerError, "Database inconsistency"));
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
Expand All @@ -526,12 +551,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"reason" => "execution layer not synced",
);
// send the stream terminator
return self.send_error_response(
peer_id,
return Err((
RPCResponseErrorCode::ResourceUnavailable,
"Execution layer not synced".into(),
request_id,
);
"Execution layer not synced",
));
}
Err(e) => {
if matches!(
Expand All @@ -556,12 +579,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}

// send the stream terminator
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Failed fetching blocks".into(),
request_id,
);
return Err((RPCResponseErrorCode::ServerError, "Failed fetching blocks"));
}
}
}
Expand Down Expand Up @@ -594,12 +612,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
}

// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlocksByRange(None),
id: request_id,
});
Ok(())
}

/// Handle a `BlobsByRange` request from the peer.
Expand Down
Loading