diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index e57e846c336..d8a10397832 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -22,11 +22,6 @@ pub struct SingleLookupReqId { pub req_id: Id, } -/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly. -/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id. -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct DataColumnsByRootRequestId(pub Id); - /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum SyncRequestId { @@ -35,11 +30,19 @@ pub enum SyncRequestId { /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. - DataColumnsByRoot(DataColumnsByRootRequestId, DataColumnsByRootRequester), + DataColumnsByRoot(DataColumnsByRootRequestId), /// Range request that is composed by both a block range request and a blob range request. RangeBlockAndBlobs { id: Id }, } +/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. +/// Wrapping this particular req_id, ensures not mixing this request with a custody req_id. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct DataColumnsByRootRequestId { + pub id: Id, + pub requester: DataColumnsByRootRequester, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Sampling(SamplingId), @@ -173,8 +176,9 @@ impl slog::Value for RequestId { } } +// This custom impl reduces log boilerplate not printing `DataColumnsByRootRequestId` on each id log impl std::fmt::Display for DataColumnsByRootRequestId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) + write!(f, "{} {:?}", self.id, self.requester) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index ae9f96a3484..7192faa12dc 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -25,8 +25,8 @@ use beacon_chain::{ use beacon_processor::WorkEvent; use lighthouse_network::rpc::{RPCError, RequestType, RpcErrorResponse}; use lighthouse_network::service::api_types::{ - AppRequestId, DataColumnsByRootRequester, Id, SamplingRequester, SingleLookupReqId, - SyncRequestId, + AppRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingRequester, + SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::SyncState; use lighthouse_network::NetworkConfig; @@ -745,10 +745,10 @@ impl TestRig { let first_dc = data_columns.first().unwrap(); let block_root = first_dc.block_root(); let sampling_request_id = match id.0 { - SyncRequestId::DataColumnsByRoot( - _, - _requester @ DataColumnsByRootRequester::Sampling(sampling_id), - ) => sampling_id.sampling_request_id, + SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId { + requester: DataColumnsByRootRequester::Sampling(sampling_id), + .. + }) => sampling_id.sampling_request_id, _ => unreachable!(), }; self.complete_data_columns_by_root_request(id, data_columns); @@ -773,14 +773,15 @@ impl TestRig { data_columns: Vec>>, missing_components: bool, ) { - let lookup_id = - if let SyncRequestId::DataColumnsByRoot(_, DataColumnsByRootRequester::Custody(id)) = - ids.first().unwrap().0 - { - id.requester.0.lookup_id - } else { - panic!("not a custody requester") - }; + let lookup_id = if let SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId { + requester: DataColumnsByRootRequester::Custody(id), + .. + }) = ids.first().unwrap().0 + { + id.requester.0.lookup_id + } else { + panic!("not a custody requester") + }; let first_column = data_columns.first().cloned().unwrap(); @@ -1189,6 +1190,7 @@ impl TestRig { penalty_msg, expect_penalty_msg, "Unexpected penalty msg for {peer_id}" ); + self.log(&format!("Found expected penalty {penalty_msg}")); } pub fn expect_single_penalty(&mut self, peer_id: PeerId, expect_penalty_msg: &'static str) { @@ -1416,7 +1418,7 @@ fn test_single_block_lookup_empty_response() { // The peer does not have the block. It should be penalized. r.single_lookup_block_response(id, peer_id, None); - r.expect_penalty(peer_id, "NoResponseReturned"); + r.expect_penalty(peer_id, "NotEnoughResponsesReturned"); // it should be retried let id = r.expect_block_lookup_request(block_root); // Send the right block this time. @@ -2160,7 +2162,7 @@ fn sampling_batch_requests_not_enough_responses_returned() { r.assert_sampling_request_ongoing(block_root, &column_indexes); // Split the indexes to simulate the case where the supernode doesn't have the requested column. - let (_column_indexes_supernode_does_not_have, column_indexes_to_complete) = + let (column_indexes_supernode_does_not_have, column_indexes_to_complete) = column_indexes.split_at(1); // Complete the requests but only partially, so a NotEnoughResponsesReturned error occurs. @@ -2176,7 +2178,7 @@ fn sampling_batch_requests_not_enough_responses_returned() { // The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses. r.log_sampling_requests(block_root, &column_indexes); - r.assert_sampling_request_nopeers(block_root, &column_indexes); + r.assert_sampling_request_nopeers(block_root, column_indexes_supernode_does_not_have); // The sampling request stalls. r.expect_empty_network(); @@ -2721,11 +2723,6 @@ mod deneb_only { self.blobs.pop().expect("blobs"); self } - fn invalidate_blobs_too_many(mut self) -> Self { - let first_blob = self.blobs.first().expect("blob").clone(); - self.blobs.push(first_blob); - self - } fn expect_block_process(mut self) -> Self { self.rig.expect_block_process(ResponseType::Block); self @@ -2814,21 +2811,6 @@ mod deneb_only { .expect_no_block_request(); } - #[test] - fn single_block_response_then_too_many_blobs_response_attestation() { - let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { - return; - }; - tester - .block_response_triggering_process() - .invalidate_blobs_too_many() - .blobs_response() - .expect_penalty("TooManyResponses") - // Network context returns "download success" because the request has enough blobs + it - // downscores the peer for returning too many. - .expect_no_block_request(); - } - // Test peer returning block that has unknown parent, and a new lookup is created #[test] fn parent_block_unknown_parent() { @@ -2869,7 +2851,7 @@ mod deneb_only { }; tester .empty_block_response() - .expect_penalty("NoResponseReturned") + .expect_penalty("NotEnoughResponsesReturned") .expect_block_request() .expect_no_blobs_request() .block_response_and_expect_blob_request() diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ef01763d4df..882f199b52d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -472,13 +472,9 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } - SyncRequestId::DataColumnsByRoot(req_id, requester) => self - .on_data_columns_by_root_response( - req_id, - requester, - peer_id, - RpcEvent::RPCError(error), - ), + SyncRequestId::DataColumnsByRoot(req_id) => { + self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error)) + } SyncRequestId::RangeBlockAndBlobs { id } => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { @@ -1104,10 +1100,9 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - SyncRequestId::DataColumnsByRoot(req_id, requester) => { + SyncRequestId::DataColumnsByRoot(req_id) => { self.on_data_columns_by_root_response( req_id, - requester, peer_id, match data_column { Some(data_column) => RpcEvent::Response(data_column, seen_timestamp), @@ -1149,7 +1144,6 @@ impl SyncManager { fn on_data_columns_by_root_response( &mut self, req_id: DataColumnsByRootRequestId, - requester: DataColumnsByRootRequester, peer_id: PeerId, data_column: RpcEvent>>, ) { @@ -1157,7 +1151,7 @@ impl SyncManager { self.network .on_data_columns_by_root_response(req_id, peer_id, data_column) { - match requester { + match req_id.requester { DataColumnsByRootRequester::Sampling(id) => { if let Some((requester, result)) = self.sampling diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 025003eef74..eb42e697cd2 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,6 @@ //! channel and stores a global RPC ID to perform requests. use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; -use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; @@ -30,8 +29,11 @@ use lighthouse_network::service::api_types::{ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use rand::seq::SliceRandom; use rand::thread_rng; -use requests::ActiveDataColumnsByRootRequest; pub use requests::LookupVerifyError; +use requests::{ + ActiveRequests, BlobsByRootRequestItems, BlocksByRootRequestItems, + DataColumnsByRootRequestItems, +}; use slog::{debug, error, warn}; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -180,18 +182,17 @@ pub struct SyncNetworkContext { request_id: Id, /// A mapping of active BlocksByRoot requests, including both current slot and parent lookups. - blocks_by_root_requests: FnvHashMap, - + blocks_by_root_requests: + ActiveRequests>, /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. - blobs_by_root_requests: FnvHashMap>, + blobs_by_root_requests: ActiveRequests>, + /// A mapping of active DataColumnsByRoot requests + data_columns_by_root_requests: + ActiveRequests>, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, - /// A mapping of active DataColumnsByRoot requests - data_columns_by_root_requests: - FnvHashMap>, - /// BlocksByRange requests paired with BlobsByRange range_block_components_requests: FnvHashMap)>, @@ -239,9 +240,9 @@ impl SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, - blocks_by_root_requests: <_>::default(), - blobs_by_root_requests: <_>::default(), - data_columns_by_root_requests: <_>::default(), + blocks_by_root_requests: ActiveRequests::new("blocks_by_root"), + blobs_by_root_requests: ActiveRequests::new("blobs_by_root"), + data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"), custody_by_root_requests: <_>::default(), range_block_components_requests: FnvHashMap::default(), network_beacon_processor, @@ -270,34 +271,19 @@ impl SyncNetworkContext { let failed_block_ids = self .blocks_by_root_requests - .iter() - .filter_map(|(id, request)| { - if request.peer_id == *peer_id { - Some(SyncRequestId::SingleBlock { id: *id }) - } else { - None - } - }); + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SingleBlock { id: *id }); let failed_blob_ids = self .blobs_by_root_requests - .iter() - .filter_map(|(id, request)| { - if request.peer_id == *peer_id { - Some(SyncRequestId::SingleBlob { id: *id }) - } else { - None - } - }); - let failed_data_column_by_root_ids = - self.data_columns_by_root_requests - .iter() - .filter_map(|(req_id, request)| { - if request.peer_id == *peer_id { - Some(SyncRequestId::DataColumnsByRoot(*req_id, request.requester)) - } else { - None - } - }); + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SingleBlob { id: *id }); + let failed_data_column_by_root_ids = self + .data_columns_by_root_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::DataColumnsByRoot(*req_id)); failed_range_ids .chain(failed_block_ids) @@ -616,8 +602,14 @@ impl SyncNetworkContext { }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; - self.blocks_by_root_requests - .insert(id, ActiveBlocksByRootRequest::new(request, peer_id)); + self.blocks_by_root_requests.insert( + id, + peer_id, + // true = enforce max_requests as returned for blocks_by_root. We always request a single + // block and the peer must have it. + true, + BlocksByRootRequestItems::new(request), + ); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -677,8 +669,15 @@ impl SyncNetworkContext { }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; - self.blobs_by_root_requests - .insert(id, ActiveBlobsByRootRequest::new(request, peer_id)); + self.blobs_by_root_requests.insert( + id, + peer_id, + // true = enforce max_requests are returned for blobs_by_root. We only issue requests for + // blocks after we know the block has data, and only request peers after they claim to + // have imported the block+blobs. + true, + BlobsByRootRequestItems::new(request), + ); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -689,8 +688,12 @@ impl SyncNetworkContext { requester: DataColumnsByRootRequester, peer_id: PeerId, request: DataColumnsByRootSingleBlockRequest, + expect_max_responses: bool, ) -> Result, &'static str> { - let req_id = DataColumnsByRootRequestId(self.next_id()); + let req_id = DataColumnsByRootRequestId { + id: self.next_id(), + requester, + }; debug!( self.log, "Sending DataColumnsByRoot Request"; @@ -705,12 +708,14 @@ impl SyncNetworkContext { self.send_network_msg(NetworkMessage::SendRequest { peer_id, request: RequestType::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)), - request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id, requester)), + request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id)), })?; self.data_columns_by_root_requests.insert( req_id, - ActiveDataColumnsByRootRequest::new(request, peer_id, requester), + peer_id, + expect_max_responses, + DataColumnsByRootRequestItems::new(request), ); Ok(LookupRequestResult::RequestSent(req_id)) @@ -916,142 +921,74 @@ impl SyncNetworkContext { // Request handlers - pub fn on_single_block_response( + pub(crate) fn on_single_block_response( &mut self, - request_id: SingleLookupReqId, + id: SingleLookupReqId, peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>> { - let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { - metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]); - return None; - }; - - let resp = match rpc_event { - RpcEvent::Response(block, seen_timestamp) => { - match request.get_mut().add_response(block) { - Ok(block) => Ok((block, seen_timestamp)), - Err(e) => { - // The request must be dropped after receiving an error. - request.remove(); - Err(e.into()) - } + let response = self.blocks_by_root_requests.on_response(id, rpc_event); + let response = response.map(|res| { + res.and_then(|(mut blocks, seen_timestamp)| { + // Enforce that exactly one chunk = one block is returned. ReqResp behavior limits the + // response count to at most 1. + match blocks.pop() { + Some(block) => Ok((block, seen_timestamp)), + // Should never happen, `blocks_by_root_requests` enforces that we receive at least + // 1 chunk. + None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()), } - } - RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - Err(e) => Err(e.into()), - }, - RpcEvent::RPCError(e) => { - request.remove(); - Err(e.into()) - } - }; - - if let Err(RpcResponseError::VerifyError(e)) = &resp { + }) + }); + if let Some(Err(RpcResponseError::VerifyError(e))) = &response { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } - Some(resp) + response } - pub fn on_single_blob_response( + pub(crate) fn on_single_blob_response( &mut self, - request_id: SingleLookupReqId, + id: SingleLookupReqId, peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>> { - let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { - metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]); - return None; - }; - - let resp = match rpc_event { - RpcEvent::Response(blob, seen_timestamp) => { - let request = request.get_mut(); - match request.add_response(blob) { - Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) - .map(|blobs| (blobs, seen_timestamp)) - .map_err(|e| (e.into(), request.resolve())), - Ok(None) => return None, - Err(e) => Err((e.into(), request.resolve())), - } - } - RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - // (err, false = not resolved) because terminate returns Ok() if resolved - Err(e) => Err((e.into(), false)), - }, - RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())), - }; - - match resp { - Ok(resp) => Some(Ok(resp)), - // Track if this request has already returned some value downstream. Ensure that - // downstream code only receives a single Result per request. If the serving peer does - // multiple penalizable actions per request, downscore and return None. This allows to - // catch if a peer is returning more blobs than requested or if the excess blobs are - // invalid. - Err((e, resolved)) => { - if let RpcResponseError::VerifyError(e) = &e { - self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - if resolved { - None - } else { - Some(Err(e)) - } - } + let response = self.blobs_by_root_requests.on_response(id, rpc_event); + let response = response.map(|res| { + res.and_then( + |(blobs, seen_timestamp)| match to_fixed_blob_sidecar_list(blobs) { + Ok(blobs) => Ok((blobs, seen_timestamp)), + Err(e) => Err(e.into()), + }, + ) + }); + if let Some(Err(RpcResponseError::VerifyError(e))) = &response { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } + response } #[allow(clippy::type_complexity)] - pub fn on_data_columns_by_root_response( + pub(crate) fn on_data_columns_by_root_response( &mut self, id: DataColumnsByRootRequestId, - _peer_id: PeerId, + peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>>> { - let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else { - return None; - }; - - let resp = match rpc_event { - RpcEvent::Response(data_column, seen_timestamp) => { - let request = request.get_mut(); - match request.add_response(data_column) { - Ok(Some(data_columns)) => Ok((data_columns, seen_timestamp)), - Ok(None) => return None, - Err(e) => Err((e.into(), request.resolve())), - } - } - RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - // (err, false = not resolved) because terminate returns Ok() if resolved - Err(e) => Err((e.into(), false)), - }, - RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())), - }; + let resp = self + .data_columns_by_root_requests + .on_response(id, rpc_event); + self.report_rpc_response_errors(resp, peer_id) + } - match resp { - Ok(resp) => Some(Ok(resp)), - // Track if this request has already returned some value downstream. Ensure that - // downstream code only receives a single Result per request. If the serving peer does - // multiple penalizable actions per request, downscore and return None. This allows to - // catch if a peer is returning more columns than requested or if the excess blobs are - // invalid. - Err((e, resolved)) => { - if let RpcResponseError::VerifyError(_e) = &e { - // TODO(das): this is a bug, we should not penalise peer in this case. - // confirm this can be removed. - // self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - if resolved { - None - } else { - Some(Err(e)) - } - } + fn report_rpc_response_errors( + &mut self, + resp: Option>, + peer_id: PeerId, + ) -> Option> { + if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } + resp } /// Insert a downloaded column into an active custody request. Then make progress on the diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 6736bfb82f0..e4bce3dafcd 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -283,6 +283,10 @@ impl ActiveCustodyRequest { block_root: self.block_root, indices: indices.clone(), }, + // true = enforce max_requests are returned data_columns_by_root. We only issue requests + // for blocks after we know the block has data, and only request peers after they claim to + // have imported the block+columns and claim to be custodians + true, ) .map_err(Error::SendFailed)?; diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 0c2f59d143f..b9214bafcd7 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,23 +1,187 @@ +use std::{collections::hash_map::Entry, hash::Hash}; + +use beacon_chain::validator_monitor::timestamp_now; +use fnv::FnvHashMap; +use lighthouse_network::PeerId; use strum::IntoStaticStr; use types::Hash256; -pub use blobs_by_root::{ActiveBlobsByRootRequest, BlobsByRootSingleBlockRequest}; -pub use blocks_by_root::{ActiveBlocksByRootRequest, BlocksByRootSingleRequest}; +pub use blobs_by_root::{BlobsByRootRequestItems, BlobsByRootSingleBlockRequest}; +pub use blocks_by_root::{BlocksByRootRequestItems, BlocksByRootSingleRequest}; pub use data_columns_by_root::{ - ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest, + DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +use crate::metrics; + +use super::{RpcEvent, RpcResponseResult}; + mod blobs_by_root; mod blocks_by_root; mod data_columns_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { - NoResponseReturned, - NotEnoughResponsesReturned { expected: usize, actual: usize }, + NotEnoughResponsesReturned { actual: usize }, TooManyResponses, UnrequestedBlockRoot(Hash256), UnrequestedIndex(u64), InvalidInclusionProof, DuplicateData, } + +/// Collection of active requests of a single ReqResp method, i.e. `blocks_by_root` +pub struct ActiveRequests { + requests: FnvHashMap>, + name: &'static str, +} + +/// Stateful container for a single active ReqResp request +struct ActiveRequest { + state: State, + peer_id: PeerId, + // Error if the request terminates before receiving max expected responses + expect_max_responses: bool, +} + +enum State { + Active(T), + CompletedEarly, + Errored, +} + +impl ActiveRequests { + pub fn new(name: &'static str) -> Self { + Self { + requests: <_>::default(), + name, + } + } + + pub fn insert(&mut self, id: K, peer_id: PeerId, expect_max_responses: bool, items: T) { + self.requests.insert( + id, + ActiveRequest { + state: State::Active(items), + peer_id, + expect_max_responses, + }, + ); + } + + /// Handle an `RpcEvent` for a specific request index by `id`. + /// + /// Lighthouse ReqResp protocol API promises to send 0 or more `RpcEvent::Response` chunks, + /// and EITHER a single `RpcEvent::RPCError` or RpcEvent::StreamTermination. + /// + /// Downstream code expects to receive a single `Result` value per request ID. However, + /// `add_item` may convert ReqResp success chunks into errors. This function handles the + /// multiple errors / stream termination internally ensuring that a single `Some` is + /// returned. + pub fn on_response( + &mut self, + id: K, + rpc_event: RpcEvent, + ) -> Option>> { + let Entry::Occupied(mut entry) = self.requests.entry(id) else { + metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &[self.name]); + return None; + }; + + match rpc_event { + // Handler of a success ReqResp chunk. Adds the item to the request accumulator. + // `ActiveRequestItems` validates the item before appending to its internal state. + RpcEvent::Response(item, seen_timestamp) => { + let request = &mut entry.get_mut(); + match &mut request.state { + State::Active(items) => { + match items.add(item) { + // Received all items we are expecting for, return early, but keep the request + // struct to handle the stream termination gracefully. + Ok(true) => { + let items = items.consume(); + request.state = State::CompletedEarly; + Some(Ok((items, seen_timestamp))) + } + // Received item, but we are still expecting more + Ok(false) => None, + // Received an invalid item + Err(e) => { + request.state = State::Errored; + Some(Err(e.into())) + } + } + } + // Should never happen, ReqResp network behaviour enforces a max count of chunks + // When `max_remaining_chunks <= 1` a the inbound stream in terminated in + // `rpc/handler.rs`. Handling this case adds complexity for no gain. Even if an + // attacker could abuse this, there's no gain in sending garbage chunks that + // will be ignored anyway. + State::CompletedEarly => None, + // Ignore items after errors. We may want to penalize repeated invalid chunks + // for the same response. But that's an optimization to ban peers sending + // invalid data faster that we choose to not adopt for now. + State::Errored => None, + } + } + RpcEvent::StreamTermination => { + // After stream termination we must forget about this request, there will be no more + // messages coming from the network + let request = entry.remove(); + match request.state { + // Received a stream termination in a valid sequence, consume items + State::Active(mut items) => { + if request.expect_max_responses { + Some(Err(LookupVerifyError::NotEnoughResponsesReturned { + actual: items.consume().len(), + } + .into())) + } else { + Some(Ok((items.consume(), timestamp_now()))) + } + } + // Items already returned, ignore stream termination + State::CompletedEarly => None, + // Returned an error earlier, ignore stream termination + State::Errored => None, + } + } + RpcEvent::RPCError(e) => { + // After an Error event from the network we must forget about this request as this + // may be the last message for this request. + match entry.remove().state { + // Received error while request is still active, propagate error. + State::Active(_) => Some(Err(e.into())), + // Received error after completing the request, ignore the error. This is okay + // because the network has already registered a downscore event if necessary for + // this message. + State::CompletedEarly => None, + // Received a network error after a validity error. Okay to ignore, see above + State::Errored => None, + } + } + } + } + + pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> { + self.requests + .iter() + .filter(|(_, request)| &request.peer_id == peer_id) + .map(|(id, _)| id) + .collect() + } + + pub fn len(&self) -> usize { + self.requests.len() + } +} + +pub trait ActiveRequestItems { + type Item; + + /// Add a new item into the accumulator. Returns true if all expected items have been received. + fn add(&mut self, item: Self::Item) -> Result; + + /// Return all accumulated items consuming them. + fn consume(&mut self) -> Vec; +} diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs index cb2b1a42ec4..fefb27a5efc 100644 --- a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs @@ -1,8 +1,8 @@ -use lighthouse_network::{rpc::methods::BlobsByRootRequest, PeerId}; +use lighthouse_network::rpc::methods::BlobsByRootRequest; use std::sync::Arc; use types::{blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256}; -use super::LookupVerifyError; +use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Clone)] pub struct BlobsByRootSingleBlockRequest { @@ -25,34 +25,27 @@ impl BlobsByRootSingleBlockRequest { } } -pub struct ActiveBlobsByRootRequest { +pub struct BlobsByRootRequestItems { request: BlobsByRootSingleBlockRequest, - blobs: Vec>>, - resolved: bool, - pub(crate) peer_id: PeerId, + items: Vec>>, } -impl ActiveBlobsByRootRequest { - pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self { +impl BlobsByRootRequestItems { + pub fn new(request: BlobsByRootSingleBlockRequest) -> Self { Self { request, - blobs: vec![], - resolved: false, - peer_id, + items: vec![], } } +} + +impl ActiveRequestItems for BlobsByRootRequestItems { + type Item = Arc>; /// Appends a chunk to this multi-item request. If all expected chunks are received, this /// method returns `Some`, resolving the request before the stream terminator. /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - blob: Arc>, - ) -> Result>>>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - + fn add(&mut self, blob: Self::Item) -> Result { let block_root = blob.block_root(); if self.request.block_root != block_root { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); @@ -63,34 +56,16 @@ impl ActiveBlobsByRootRequest { if !self.request.indices.contains(&blob.index) { return Err(LookupVerifyError::UnrequestedIndex(blob.index)); } - if self.blobs.iter().any(|b| b.index == blob.index) { + if self.items.iter().any(|b| b.index == blob.index) { return Err(LookupVerifyError::DuplicateData); } - self.blobs.push(blob); - if self.blobs.len() >= self.request.indices.len() { - // All expected chunks received, return result early - self.resolved = true; - Ok(Some(std::mem::take(&mut self.blobs))) - } else { - Ok(None) - } - } + self.items.push(blob); - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NotEnoughResponsesReturned { - expected: self.request.indices.len(), - actual: self.blobs.len(), - }) - } + Ok(self.items.len() >= self.request.indices.len()) } - /// Mark request as resolved (= has returned something downstream) while marking this status as - /// true for future calls. - pub fn resolve(&mut self) -> bool { - std::mem::replace(&mut self.resolved, true) + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) } } diff --git a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs index a15d4e39353..f3cdcbe714f 100644 --- a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs @@ -1,9 +1,9 @@ use beacon_chain::get_block_root; -use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; +use lighthouse_network::rpc::BlocksByRootRequest; use std::sync::Arc; use types::{ChainSpec, EthSpec, Hash256, SignedBeaconBlock}; -use super::LookupVerifyError; +use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Copy, Clone)] pub struct BlocksByRootSingleRequest(pub Hash256); @@ -14,47 +14,38 @@ impl BlocksByRootSingleRequest { } } -pub struct ActiveBlocksByRootRequest { +pub struct BlocksByRootRequestItems { request: BlocksByRootSingleRequest, - resolved: bool, - pub(crate) peer_id: PeerId, + items: Vec>>, } -impl ActiveBlocksByRootRequest { - pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self { +impl BlocksByRootRequestItems { + pub fn new(request: BlocksByRootSingleRequest) -> Self { Self { request, - resolved: false, - peer_id, + items: vec![], } } +} + +impl ActiveRequestItems for BlocksByRootRequestItems { + type Item = Arc>; /// Append a response to the single chunk request. If the chunk is valid, the request is /// resolved immediately. /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - block: Arc>, - ) -> Result>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - + fn add(&mut self, block: Self::Item) -> Result { let block_root = get_block_root(&block); if self.request.0 != block_root { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } - // Valid data, blocks by root expects a single response - self.resolved = true; - Ok(block) + self.items.push(block); + // Always returns true, blocks by root expects a single response + Ok(true) } - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NoResponseReturned) - } + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) } } diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs index a42ae7ca41f..1b8d46ff072 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -1,9 +1,8 @@ -use lighthouse_network::service::api_types::DataColumnsByRootRequester; -use lighthouse_network::{rpc::methods::DataColumnsByRootRequest, PeerId}; +use lighthouse_network::rpc::methods::DataColumnsByRootRequest; use std::sync::Arc; use types::{ChainSpec, DataColumnIdentifier, DataColumnSidecar, EthSpec, Hash256}; -use super::LookupVerifyError; +use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Clone)] pub struct DataColumnsByRootSingleBlockRequest { @@ -26,40 +25,27 @@ impl DataColumnsByRootSingleBlockRequest { } } -pub struct ActiveDataColumnsByRootRequest { +pub struct DataColumnsByRootRequestItems { request: DataColumnsByRootSingleBlockRequest, items: Vec>>, - resolved: bool, - pub(crate) peer_id: PeerId, - pub(crate) requester: DataColumnsByRootRequester, } -impl ActiveDataColumnsByRootRequest { - pub fn new( - request: DataColumnsByRootSingleBlockRequest, - peer_id: PeerId, - requester: DataColumnsByRootRequester, - ) -> Self { +impl DataColumnsByRootRequestItems { + pub fn new(request: DataColumnsByRootSingleBlockRequest) -> Self { Self { request, items: vec![], - resolved: false, - peer_id, - requester, } } +} + +impl ActiveRequestItems for DataColumnsByRootRequestItems { + type Item = Arc>; /// Appends a chunk to this multi-item request. If all expected chunks are received, this /// method returns `Some`, resolving the request before the stream terminator. /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - data_column: Arc>, - ) -> Result>>>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - + fn add(&mut self, data_column: Self::Item) -> Result { let block_root = data_column.block_root(); if self.request.block_root != block_root { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); @@ -75,29 +61,11 @@ impl ActiveDataColumnsByRootRequest { } self.items.push(data_column); - if self.items.len() >= self.request.indices.len() { - // All expected chunks received, return result early - self.resolved = true; - Ok(Some(std::mem::take(&mut self.items))) - } else { - Ok(None) - } - } - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NotEnoughResponsesReturned { - expected: self.request.indices.len(), - actual: self.items.len(), - }) - } + Ok(self.items.len() >= self.request.indices.len()) } - /// Mark request as resolved (= has returned something downstream) while marking this status as - /// true for future calls. - pub fn resolve(&mut self) -> bool { - std::mem::replace(&mut self.resolved, true) + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) } } diff --git a/beacon_node/network/src/sync/peer_sampling.rs b/beacon_node/network/src/sync/peer_sampling.rs index decabfd2164..7e725f5df5c 100644 --- a/beacon_node/network/src/sync/peer_sampling.rs +++ b/beacon_node/network/src/sync/peer_sampling.rs @@ -88,7 +88,11 @@ impl Sampling { } }; - debug!(self.log, "Created new sample request"; "id" => ?id); + debug!(self.log, + "Created new sample request"; + "id" => ?id, + "column_selection" => ?request.column_selection() + ); // TOOD(das): If a node has very little peers, continue_sampling() will attempt to find enough // to sample here, immediately failing the sampling request. There should be some grace @@ -239,6 +243,15 @@ impl ActiveSamplingRequest { self.column_requests.get(index).map(|req| req.status()) } + /// Return the current ordered list of columns that this requests has to sample to succeed + pub(crate) fn column_selection(&self) -> Vec { + self.column_shuffle + .iter() + .take(REQUIRED_SUCCESSES[0]) + .copied() + .collect() + } + /// Insert a downloaded column into an active sampling request. Then make progress on the /// entire request. /// @@ -531,6 +544,10 @@ impl ActiveSamplingRequest { block_root: self.block_root, indices: column_indexes.clone(), }, + // false = We issue request to custodians who may or may not have received the + // samples yet. We don't any signal (like an attestation or status messages that the + // custodian has received data). + false, ) .map_err(SamplingError::SendFailed)?; self.column_indexes_by_sampling_request