diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 60126818b60..1deb50237db 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -7,7 +7,6 @@ use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use crate::sync::manager::{Id, SingleLookupReqId}; -use crate::sync::network_context::LookupFailure; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; @@ -325,12 +324,7 @@ impl BlockLookups { response: RpcProcessingResult, cx: &mut SyncNetworkContext, ) -> Result { - // Downscore peer even if lookup is not known - // Only downscore lookup verify errors. RPC errors are downscored in the network handler. - if let Err(LookupFailure::LookupVerifyError(e)) = &response { - // Note: the error is displayed in full debug form on the match below - cx.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } + // Note: no need to downscore peers here, already downscored on network context let response_type = R::response_type(); let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { @@ -459,23 +453,16 @@ impl BlockLookups { // if both components have been processed. request_state.on_processing_success()?; - // If this was the result of a block request, we can't determined if the block peer did anything - // wrong. If we already had both a block and blobs response processed, we should penalize the - // blobs peer because they did not provide all blobs on the initial request. if lookup.both_components_processed() { - if let Some(blob_peer) = lookup - .blob_request_state - .state - .on_post_process_validation_failure()? - { - cx.report_peer( - blob_peer, - PeerAction::MidToleranceError, - "sent_incomplete_blobs", - ); - } + // We don't request for other block components until being sure that the block has + // data. If we request blobs / columns to a peer we are sure those must exist. + // Therefore if all components are processed and we still receive `MissingComponents` + // it indicates an internal bug. + return Err(LookupRequestError::MissingComponentsAfterAllProcessed); + } else { + // Continue request, potentially request blobs + Action::Retry } - Action::Retry } BlockProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 6804798dc93..b6c2825fabc 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -39,6 +39,9 @@ pub enum LookupRequestError { BadState(String), /// Lookup failed for some other reason and should be dropped Failed, + /// Received MissingComponents when all components have been processed. This should never + /// happen, and indicates some internal bug + MissingComponentsAfterAllProcessed, /// Attempted to retrieve a not known lookup id UnknownLookup, /// Received a download result for a different request id than the in-flight request. @@ -158,7 +161,7 @@ impl SingleBlockLookup { } /// Potentially makes progress on this request if it's in a progress-able state - pub fn continue_request>( + fn continue_request>( &mut self, cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { @@ -285,10 +288,8 @@ pub enum State { AwaitingProcess(DownloadResult), /// Request is processing, sent by lookup sync Processing(DownloadResult), - /// Request is processed: - /// - `Processed(Some)` if lookup sync downloaded and sent to process this request - /// - `Processed(None)` if another source (i.e. gossip) sent this component for processing - Processed(Option), + /// Request is processed + Processed, } /// Object representing the state of a single block or blob lookup request. @@ -463,8 +464,8 @@ impl SingleLookupRequestState { pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> { match &self.state { - State::Processing(result) => { - self.state = State::Processed(Some(result.peer_id)); + State::Processing(_) => { + self.state = State::Processed; Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -473,27 +474,11 @@ impl SingleLookupRequestState { } } - pub fn on_post_process_validation_failure( - &mut self, - ) -> Result, LookupRequestError> { - match &self.state { - State::Processed(peer_id) => { - let peer_id = *peer_id; - self.failed_processing = self.failed_processing.saturating_add(1); - self.state = State::AwaitingDownload; - Ok(peer_id) - } - other => Err(LookupRequestError::BadState(format!( - "Bad state on_post_process_validation_failure expected Processed got {other}" - ))), - } - } - /// Mark a request as complete without any download or processing pub fn on_completed_request(&mut self) -> Result<(), LookupRequestError> { match &self.state { State::AwaitingDownload => { - self.state = State::Processed(None); + self.state = State::Processed; Ok(()) } other => Err(LookupRequestError::BadState(format!( diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 619db0469ba..2a59c24d580 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -287,6 +287,11 @@ impl TestRig { self.expect_no_active_single_lookups(); } + fn expect_no_active_lookups_empty_network(&mut self) { + self.expect_no_active_lookups(); + self.expect_empty_network(); + } + fn new_connected_peer(&mut self) -> PeerId { let peer_id = PeerId::random(); self.network_globals @@ -461,14 +466,16 @@ impl TestRig { ); } - fn complete_single_lookup_block_valid(&mut self, block: SignedBeaconBlock, import: bool) { + fn complete_lookup_block_download(&mut self, block: SignedBeaconBlock) { let block_root = block.canonical_root(); - let block_slot = block.slot(); let id = self.expect_block_lookup_request(block_root); self.expect_empty_network(); let peer_id = self.new_connected_peer(); self.single_lookup_block_response(id, peer_id, Some(block.into())); self.single_lookup_block_response(id, peer_id, None); + } + + fn complete_lookup_block_import_valid(&mut self, block_root: Hash256, import: bool) { self.expect_block_process(ResponseType::Block); let id = self.find_single_lookup_for(block_root); self.single_block_component_processed( @@ -477,12 +484,19 @@ impl TestRig { BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)) } else { BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - block_slot, block_root, + Slot::new(0), + block_root, )) }, ) } + fn complete_single_lookup_block_valid(&mut self, block: SignedBeaconBlock, import: bool) { + let block_root = block.canonical_root(); + self.complete_lookup_block_download(block); + self.complete_lookup_block_import_valid(block_root, import) + } + fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) { self.send_sync_message(SyncMessage::RpcError { peer_id, @@ -676,26 +690,6 @@ impl TestRig { .unwrap_or_else(|e| panic!("Expected blob parent request for {for_block:?}: {e}")) } - fn expect_lookup_request_block_and_blobs(&mut self, block_root: Hash256) -> SingleLookupReqId { - let id = self.expect_block_lookup_request(block_root); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if self.after_deneb() { - let _ = self.expect_blob_lookup_request(block_root); - } - id - } - - fn expect_parent_request_block_and_blobs(&mut self, block_root: Hash256) -> SingleLookupReqId { - let id = self.expect_block_parent_request(block_root); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if self.after_deneb() { - let _ = self.expect_blob_parent_request(block_root); - } - id - } - #[track_caller] fn expect_block_process(&mut self, response_type: ResponseType) { match response_type { @@ -932,7 +926,7 @@ fn test_single_block_lookup_happy_path() { let block_root = block.canonical_root(); // Trigger the request rig.trigger_unknown_block_from_attestation(block_root, peer_id); - let id = rig.expect_lookup_request_block_and_blobs(block_root); + let id = rig.expect_block_lookup_request(block_root); // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. @@ -962,7 +956,7 @@ fn test_single_block_lookup_empty_response() { // Trigger the request r.trigger_unknown_block_from_attestation(block_root, peer_id); - let id = r.expect_lookup_request_block_and_blobs(block_root); + let id = r.expect_block_lookup_request(block_root); // The peer does not have the block. It should be penalized. r.single_lookup_block_response(id, peer_id, None); @@ -985,7 +979,7 @@ fn test_single_block_lookup_wrong_response() { // Trigger the request rig.trigger_unknown_block_from_attestation(block_hash, peer_id); - let id = rig.expect_lookup_request_block_and_blobs(block_hash); + let id = rig.expect_block_lookup_request(block_hash); // Peer sends something else. It should be penalized. let bad_block = rig.rand_block(); @@ -1007,7 +1001,7 @@ fn test_single_block_lookup_failure() { // Trigger the request rig.trigger_unknown_block_from_attestation(block_hash, peer_id); - let id = rig.expect_lookup_request_block_and_blobs(block_hash); + let id = rig.expect_block_lookup_request(block_hash); // The request fails. RPC failures are handled elsewhere so we should not penalize the peer. rig.single_lookup_failed(id, peer_id, RPCError::UnsupportedProtocol); @@ -1026,7 +1020,7 @@ fn test_single_block_lookup_becomes_parent_request() { // Trigger the request rig.trigger_unknown_block_from_attestation(block.canonical_root(), peer_id); - let id = rig.expect_lookup_request_block_and_blobs(block_root); + let id = rig.expect_block_parent_request(block_root); // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. @@ -1044,7 +1038,7 @@ fn test_single_block_lookup_becomes_parent_request() { BlockError::ParentUnknown(RpcBlock::new_without_blobs(None, block)).into(), ); assert_eq!(rig.active_single_lookups_count(), 2); // 2 = current + parent - rig.expect_parent_request_block_and_blobs(parent_root); + rig.expect_block_parent_request(parent_root); rig.expect_empty_network(); assert_eq!(rig.active_parent_lookups_count(), 1); } @@ -1058,10 +1052,12 @@ fn test_parent_lookup_happy_path() { // Trigger the request rig.trigger_unknown_parent_block(peer_id, block.into()); - let id = rig.expect_parent_request_block_and_blobs(parent_root); + let id = rig.expect_block_parent_request(parent_root); // Peer sends the right block, it should be sent for processing. Peer should not be penalized. rig.parent_lookup_block_response(id, peer_id, Some(parent.into())); + // No request of blobs because the block has not data + rig.expect_empty_network(); rig.expect_block_process(ResponseType::Block); rig.expect_empty_network(); @@ -1074,7 +1070,7 @@ fn test_parent_lookup_happy_path() { ); rig.expect_parent_chain_process(); rig.parent_chain_processed_success(block_root, &[]); - rig.expect_no_active_lookups(); + rig.expect_no_active_lookups_empty_network(); } #[test] @@ -1086,7 +1082,7 @@ fn test_parent_lookup_wrong_response() { // Trigger the request rig.trigger_unknown_parent_block(peer_id, block.into()); - let id1 = rig.expect_parent_request_block_and_blobs(parent_root); + let id1 = rig.expect_block_parent_request(parent_root); // Peer sends the wrong block, peer should be penalized and the block re-requested. let bad_block = rig.rand_block(); @@ -1108,7 +1104,7 @@ fn test_parent_lookup_wrong_response() { rig.parent_block_processed_imported(block_root); rig.expect_parent_chain_process(); rig.parent_chain_processed_success(block_root, &[]); - rig.expect_no_active_lookups(); + rig.expect_no_active_lookups_empty_network(); } #[test] @@ -1120,14 +1116,14 @@ fn test_parent_lookup_rpc_failure() { // Trigger the request rig.trigger_unknown_parent_block(peer_id, block.into()); - let id1 = rig.expect_parent_request_block_and_blobs(parent_root); + let id = rig.expect_block_parent_request(parent_root); // The request fails. It should be tried again. - rig.parent_lookup_failed_unavailable(id1, peer_id); - let id2 = rig.expect_block_parent_request(parent_root); + rig.parent_lookup_failed_unavailable(id, peer_id); + let id = rig.expect_block_parent_request(parent_root); // Send the right block this time. - rig.parent_lookup_block_response(id2, peer_id, Some(parent.into())); + rig.parent_lookup_block_response(id, peer_id, Some(parent.into())); rig.expect_block_process(ResponseType::Block); // Add peer to child lookup to prevent it being dropped @@ -1136,7 +1132,7 @@ fn test_parent_lookup_rpc_failure() { rig.parent_block_processed_imported(block_root); rig.expect_parent_chain_process(); rig.parent_chain_processed_success(block_root, &[]); - rig.expect_no_active_lookups(); + rig.expect_no_active_lookups_empty_network(); } #[test] @@ -1152,9 +1148,6 @@ fn test_parent_lookup_too_many_attempts() { for i in 1..=PARENT_FAIL_TOLERANCE { let id = rig.expect_block_parent_request(parent_root); // Blobs are only requested in the first iteration as this test only retries blocks - if rig.after_deneb() && i == 1 { - let _ = rig.expect_blob_parent_request(parent_root); - } if i % 2 == 0 { // make sure every error is accounted for @@ -1178,7 +1171,7 @@ fn test_parent_lookup_too_many_attempts() { } } - rig.expect_no_active_lookups(); + rig.expect_no_active_lookups_empty_network(); } #[test] @@ -1193,10 +1186,6 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() { for i in 1..=PARENT_FAIL_TOLERANCE { assert!(!rig.failed_chains_contains(&block_root)); let id = rig.expect_block_parent_request(parent_root); - // Blobs are only requested in the first iteration as this test only retries blocks - if rig.after_deneb() && i == 1 { - let _ = rig.expect_blob_parent_request(parent_root); - } if i % 2 != 0 { // The request fails. It should be tried again. rig.parent_lookup_failed_unavailable(id, peer_id); @@ -1210,7 +1199,7 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() { assert!(!rig.failed_chains_contains(&block_root)); assert!(!rig.failed_chains_contains(&parent.canonical_root())); - rig.expect_no_active_lookups(); + rig.expect_no_active_lookups_empty_network(); } #[test] @@ -1224,12 +1213,8 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { rig.trigger_unknown_parent_block(peer_id, block.into()); rig.log("Fail downloading the block"); - for i in 0..(PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) { + for _ in 0..(PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) { let id = rig.expect_block_parent_request(parent_root); - // Blobs are only requested in the first iteration as this test only retries blocks - if rig.after_deneb() && i == 0 { - let _ = rig.expect_blob_parent_request(parent_root); - } // The request fails. It should be tried again. rig.parent_lookup_failed_unavailable(id, peer_id); } @@ -1247,7 +1232,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { } rig.assert_not_failed_chain(block_root); - rig.expect_no_active_lookups(); + rig.expect_no_active_lookups_empty_network(); } #[test] @@ -1261,7 +1246,7 @@ fn test_parent_lookup_too_deep() { rig.trigger_unknown_parent_block(peer_id, trigger_block); for block in blocks.into_iter().rev() { - let id = rig.expect_parent_request_block_and_blobs(block.canonical_root()); + let id = rig.expect_block_parent_request(block.canonical_root()); // the block rig.parent_lookup_block_response(id, peer_id, Some(block.clone())); // the stream termination @@ -1326,7 +1311,7 @@ fn test_single_block_lookup_ignored_response() { // Trigger the request rig.trigger_unknown_block_from_attestation(block.canonical_root(), peer_id); - let id = rig.expect_lookup_request_block_and_blobs(block.canonical_root()); + let id = rig.expect_block_lookup_request(block.canonical_root()); // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. @@ -1342,8 +1327,7 @@ fn test_single_block_lookup_ignored_response() { rig.single_lookup_block_response(id, peer_id, None); // Send an Ignored response, the request should be dropped rig.single_block_component_processed(id.lookup_id, BlockProcessingResult::Ignored); - rig.expect_empty_network(); - rig.expect_no_active_lookups(); + rig.expect_no_active_lookups_empty_network(); } #[test] @@ -1355,7 +1339,7 @@ fn test_parent_lookup_ignored_response() { // Trigger the request rig.trigger_unknown_parent_block(peer_id, block.clone().into()); - let id = rig.expect_parent_request_block_and_blobs(parent_root); + let id = rig.expect_block_parent_request(parent_root); // Note: single block lookup for current `block` does not trigger any request because it does // not have blobs, and the block is already cached @@ -1385,7 +1369,7 @@ fn test_same_chain_race_condition() { rig.trigger_unknown_parent_block(peer_id, trigger_block.clone()); for (i, block) in blocks.clone().into_iter().rev().enumerate() { - let id = rig.expect_parent_request_block_and_blobs(block.canonical_root()); + let id = rig.expect_block_parent_request(block.canonical_root()); // the block rig.parent_lookup_block_response(id, peer_id, Some(block.clone())); // the stream termination @@ -1421,7 +1405,7 @@ fn test_same_chain_race_condition() { rig.expect_parent_chain_process(); rig.single_block_component_processed_imported(block.canonical_root()); } - rig.expect_no_active_lookups(); + rig.expect_no_active_lookups_empty_network(); } #[test] @@ -1453,12 +1437,13 @@ fn block_in_processing_cache_becomes_invalid() { r.insert_block_to_processing_cache(block.clone().into()); r.trigger_unknown_block_from_attestation(block_root, peer_id); // Should not trigger block request - let id = r.expect_blob_lookup_request(block_root); r.expect_empty_network(); // Simulate invalid block, removing it from processing cache r.simulate_block_gossip_processing_becomes_invalid(block_root); - // Should download and process the block - r.complete_single_lookup_block_valid(block, false); + // Should download block, then issue blobs request + r.complete_lookup_block_download(block); + let id = r.expect_blob_lookup_request(block_root); + r.complete_lookup_block_import_valid(block_root, false); // Resolve blob and expect lookup completed r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true); r.expect_no_active_lookups(); @@ -1475,10 +1460,10 @@ fn block_in_processing_cache_becomes_valid_imported() { r.insert_block_to_processing_cache(block.clone().into()); r.trigger_unknown_block_from_attestation(block_root, peer_id); // Should not trigger block request - let id = r.expect_blob_lookup_request(block_root); r.expect_empty_network(); // Resolve the block from processing step r.simulate_block_gossip_processing_becomes_valid_missing_components(block.into()); + let id = r.expect_blob_lookup_request(block_root); // Resolve blob and expect lookup completed r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true); r.expect_no_active_lookups(); @@ -1592,8 +1577,7 @@ mod deneb_only { peer_id, block_root, )); let block_req_id = rig.expect_block_lookup_request(block_root); - let blob_req_id = rig.expect_blob_lookup_request(block_root); - (Some(block_req_id), Some(blob_req_id), None, None) + (Some(block_req_id), None, None, None) } RequestTrigger::GossipUnknownParentBlock { .. } => { rig.send_sync_message(SyncMessage::UnknownParentBlock( @@ -1604,14 +1588,8 @@ mod deneb_only { let parent_root = block.parent_root(); let parent_block_req_id = rig.expect_block_parent_request(parent_root); - let parent_blob_req_id = rig.expect_blob_parent_request(parent_root); rig.expect_empty_network(); // expect no more requests - ( - None, - None, - Some(parent_block_req_id), - Some(parent_blob_req_id), - ) + (None, None, Some(parent_block_req_id), None) } RequestTrigger::GossipUnknownParentBlob { .. } => { let single_blob = blobs.first().cloned().unwrap(); @@ -1619,14 +1597,8 @@ mod deneb_only { rig.send_sync_message(SyncMessage::UnknownParentBlob(peer_id, single_blob)); let parent_block_req_id = rig.expect_block_parent_request(parent_root); - let parent_blob_req_id = rig.expect_blob_parent_request(parent_root); rig.expect_empty_network(); // expect no more requests - ( - None, - None, - Some(parent_block_req_id), - Some(parent_blob_req_id), - ) + (None, None, Some(parent_block_req_id), None) } }; @@ -1675,6 +1647,23 @@ mod deneb_only { self } + fn parent_block_response_expect_blobs(mut self) -> Self { + self.rig.expect_empty_network(); + let block = self.parent_block.pop_front().unwrap().clone(); + let _ = self.unknown_parent_block.insert(block.clone()); + self.rig.parent_lookup_block_response( + self.parent_block_req_id.expect("parent request id"), + self.peer_id, + Some(block), + ); + + // Expect blobs request after sending block + let s = self.expect_parent_blobs_request(); + + s.rig.assert_parent_lookups_count(1); + s + } + fn parent_blob_response(mut self) -> Self { let blobs = self.parent_blobs.pop_front().unwrap(); let _ = self.unknown_parent_blobs.insert(blobs.clone()); @@ -1687,7 +1676,7 @@ mod deneb_only { assert_eq!(self.rig.active_parent_lookups_count(), 1); } self.rig.parent_lookup_blob_response( - self.parent_blob_req_id.expect("blob request id"), + self.parent_blob_req_id.expect("parent blob request id"), self.peer_id, None, ); @@ -1696,7 +1685,7 @@ mod deneb_only { } fn block_response_triggering_process(self) -> Self { - let mut me = self.block_response(); + let mut me = self.block_response_and_expect_blob_request(); me.rig.expect_block_process(ResponseType::Block); // The request should still be active. @@ -1704,7 +1693,7 @@ mod deneb_only { me } - fn block_response(mut self) -> Self { + fn block_response_and_expect_blob_request(mut self) -> Self { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. self.rig.single_lookup_block_response( @@ -1712,12 +1701,14 @@ mod deneb_only { self.peer_id, Some(self.block.clone()), ); - self.rig.expect_empty_network(); + // After responding with block the node will issue a blob request + let mut s = self.expect_blobs_request(); + + s.rig.expect_empty_network(); // The request should still be active. - self.rig - .assert_lookup_is_active(self.block.canonical_root()); - self + s.rig.assert_lookup_is_active(s.block.canonical_root()); + s } fn blobs_response(mut self) -> Self { @@ -1831,6 +1822,21 @@ mod deneb_only { self } + fn parent_block_missing_components(mut self) -> Self { + let parent_root = *self.parent_block_roots.first().unwrap(); + self.rig + .log(&format!("parent_block_missing_components {parent_root:?}")); + self.rig.parent_block_processed( + self.block_root, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + Slot::new(0), + parent_root, + )), + ); + self.rig.expect_no_requests_for(parent_root); + self + } + fn parent_blob_imported(mut self) -> Self { let parent_root = *self.parent_block_roots.first().unwrap(); self.rig @@ -1864,26 +1870,6 @@ mod deneb_only { self } - fn parent_block_missing_components(mut self) -> Self { - let block = self.unknown_parent_block.clone().unwrap(); - self.rig.parent_block_processed( - self.block_root, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - block.slot(), - block.canonical_root(), - )), - ); - self.rig.parent_blob_processed( - self.block_root, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - block.slot(), - block.canonical_root(), - )), - ); - assert_eq!(self.rig.active_parent_lookups_count(), 1); - self - } - fn invalid_parent_processed(mut self) -> Self { self.rig.parent_block_processed( self.block_root, @@ -1922,45 +1908,35 @@ mod deneb_only { self.block_root, )), ); - self.rig.assert_single_lookups_count(1); - self - } + // Add block to da_checker so blobs request can continue + self.rig.insert_block_to_da_checker(self.block.clone()); - fn missing_components_from_blob_request(mut self) -> Self { - self.rig.single_blob_component_processed( - self.blob_req_id.expect("blob request id").lookup_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - self.slot, - self.block_root, - )), - ); self.rig.assert_single_lookups_count(1); self } fn complete_current_block_and_blobs_lookup(self) -> Self { self.expect_block_request() - .expect_blobs_request() - .block_response() + .block_response_and_expect_blob_request() .blobs_response() // TODO: Should send blobs for processing .expect_block_process() .block_imported() } - fn empty_parent_blobs_then_parent_block(self) -> Self { + fn parent_block_then_empty_parent_blobs(self) -> Self { self.log( " Return empty blobs for parent, block errors with missing components, downscore", ) - .empty_parent_blobs_response() - .expect_no_penalty_and_no_requests() .parent_block_response() - .parent_block_missing_components() - .expect_penalty("sent_incomplete_blobs") + .expect_parent_blobs_request() + .empty_parent_blobs_response() + .expect_penalty("NotEnoughResponsesReturned") .log("Re-request parent blobs, succeed and import parent") .expect_parent_blobs_request() .parent_blob_response() .expect_block_process() + .parent_block_missing_components() // Insert new peer into child request before completing parent .trigger_unknown_block_from_attestation() .parent_blob_imported() @@ -2044,77 +2020,27 @@ mod deneb_only { return; }; tester - .block_response_triggering_process() + .block_response_and_expect_blob_request() .blobs_response() .block_missing_components() // blobs not yet imported .blobs_response_was_valid() .blob_imported(); // now blobs resolve as imported } - #[test] - fn single_block_and_blob_lookup_blobs_returned_first_attestation() { - let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { - return; - }; - tester - .blobs_response() // hold blobs for processing - .block_response_triggering_process() - .block_missing_components() // blobs not yet imported - .blobs_response_was_valid() - .blob_imported(); // now blobs resolve as imported - } - - #[test] - fn single_block_and_blob_lookup_empty_response_attestation() { - let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { - return; - }; - tester - .empty_block_response() - .expect_penalty("NoResponseReturned") - .expect_block_request() - .expect_no_blobs_request() - .empty_blobs_response() - .expect_empty_beacon_processor() - .expect_no_penalty() - .expect_no_block_request() - .expect_no_blobs_request() - .block_response_triggering_process() - .missing_components_from_block_request(); - } - #[test] fn single_block_response_then_empty_blob_response_attestation() { let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { return; }; - tester - .block_response_triggering_process() + .block_response_and_expect_blob_request() .missing_components_from_block_request() .empty_blobs_response() - .missing_components_from_blob_request() - .expect_penalty("sent_incomplete_blobs") + .expect_penalty("NotEnoughResponsesReturned") .expect_blobs_request() .expect_no_block_request(); } - #[test] - fn single_blob_response_then_empty_block_response_attestation() { - let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { - return; - }; - - tester - .blobs_response() - .expect_no_penalty_and_no_requests() - // blobs not sent for processing until the block is processed - .empty_block_response() - .expect_penalty("NoResponseReturned") - .expect_block_request() - .expect_no_blobs_request(); - } - #[test] fn single_invalid_block_response_then_blob_response_attestation() { let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { @@ -2156,8 +2082,7 @@ mod deneb_only { .missing_components_from_block_request() .invalidate_blobs_too_few() .blobs_response() - .missing_components_from_blob_request() - .expect_penalty("sent_incomplete_blobs") + .expect_penalty("NotEnoughResponsesReturned") .expect_blobs_request() .expect_no_block_request(); } @@ -2171,37 +2096,12 @@ mod deneb_only { .block_response_triggering_process() .invalidate_blobs_too_many() .blobs_response() - .expect_penalty("DuplicateData") - .expect_blobs_request() + .expect_penalty("TooManyResponses") + // Network context returns "download success" because the request has enough blobs + it + // downscores the peer for returning too many. .expect_no_block_request(); } - #[test] - fn too_few_blobs_response_then_block_response_attestation() { - let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { - return; - }; - tester - .invalidate_blobs_too_few() - .blobs_response() // blobs are not sent until the block is processed - .expect_no_penalty_and_no_requests() - .block_response_triggering_process(); - } - - #[test] - fn too_many_blobs_response_then_block_response_attestation() { - let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { - return; - }; - tester - .invalidate_blobs_too_many() - .blobs_response() - .expect_penalty("DuplicateData") - .expect_blobs_request() - .expect_no_block_request() - .block_response_triggering_process(); - } - // Test peer returning block that has unknown parent, and a new lookup is created #[test] fn parent_block_unknown_parent() { @@ -2210,12 +2110,11 @@ mod deneb_only { }; tester .expect_empty_beacon_processor() - .parent_block_response() + .parent_block_response_expect_blobs() .parent_blob_response() .expect_block_process() .parent_block_unknown_parent() .expect_parent_block_request() - .expect_parent_blobs_request() .expect_empty_beacon_processor(); } @@ -2226,7 +2125,7 @@ mod deneb_only { return; }; tester - .parent_block_response() + .parent_block_response_expect_blobs() .parent_blob_response() .expect_block_process() .invalid_parent_processed() @@ -2246,19 +2145,19 @@ mod deneb_only { .expect_penalty("NoResponseReturned") .expect_block_request() .expect_no_blobs_request() - .block_response() + .block_response_and_expect_blob_request() .blobs_response() .block_imported() .expect_no_active_lookups(); } #[test] - fn empty_parent_blobs_then_parent_block() { + fn parent_block_then_empty_parent_blobs() { let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock(1)) else { return; }; tester - .empty_parent_blobs_then_parent_block() + .parent_block_then_empty_parent_blobs() .log("resolve original block trigger blobs request and import") // Should not have block request, it is cached .expect_blobs_request() @@ -2274,12 +2173,11 @@ mod deneb_only { }; tester .expect_empty_beacon_processor() - .parent_block_response() + .parent_block_response_expect_blobs() .parent_blob_response() .expect_block_process() .parent_block_unknown_parent() .expect_parent_block_request() - .expect_parent_blobs_request() .expect_empty_beacon_processor(); } @@ -2290,7 +2188,7 @@ mod deneb_only { }; tester .expect_empty_beacon_processor() - .parent_block_response() + .parent_block_response_expect_blobs() .parent_blob_response() .expect_block_process() .invalid_parent_processed() @@ -2307,6 +2205,7 @@ mod deneb_only { }; tester .parent_block_response() + .expect_parent_blobs_request() .parent_blob_response() .expect_block_process() .trigger_unknown_block_from_attestation() @@ -2316,12 +2215,12 @@ mod deneb_only { } #[test] - fn empty_parent_blobs_then_parent_block_blob_trigger() { + fn parent_block_then_empty_parent_blobs_blob_trigger() { let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlob(1)) else { return; }; tester - .empty_parent_blobs_then_parent_block() + .parent_block_then_empty_parent_blobs() .log("resolve original block trigger blobs request and import") .complete_current_block_and_blobs_lookup() .expect_no_active_lookups(); @@ -2334,15 +2233,15 @@ mod deneb_only { }; tester .expect_empty_beacon_processor() - .parent_block_response() + .parent_block_response_expect_blobs() .parent_blob_response() .expect_no_penalty() .expect_block_process() .parent_block_unknown_parent() .expect_parent_block_request() - .expect_parent_blobs_request() .expect_empty_beacon_processor() .parent_block_response() + .expect_parent_blobs_request() .parent_blob_response() .expect_no_penalty() .expect_block_process(); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 4a4d70090e6..66d23dd1918 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -816,7 +816,7 @@ impl SyncManager { peer_id: PeerId, block: RpcEvent>>, ) { - if let Some(resp) = self.network.on_single_block_response(id, block) { + if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) { self.block_lookups .on_download_response::>( id, @@ -858,7 +858,7 @@ impl SyncManager { peer_id: PeerId, blob: RpcEvent>>, ) { - if let Some(resp) = self.network.on_single_blob_response(id, blob) { + if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) { self.block_lookups .on_download_response::>( id, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 44fb69d9b2f..8693bc0c6c4 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -12,7 +12,6 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; use crate::sync::manager::{BlockProcessType, SingleLookupReqId}; use beacon_chain::block_verification_types::RpcBlock; -use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; use lighthouse_network::rpc::methods::BlobsByRangeRequest; @@ -383,24 +382,18 @@ impl SyncNetworkContext { block_root: Hash256, downloaded_block_expected_blobs: Option, ) -> Result { - let expected_blobs = downloaded_block_expected_blobs - .or_else(|| { - self.chain - .data_availability_checker - .num_expected_blobs(&block_root) - }) - .unwrap_or_else(|| { - // If we don't about the block being requested, attempt to fetch all blobs - if self - .chain - .data_availability_checker - .da_check_required_for_current_epoch() - { - T::EthSpec::max_blobs_per_block() - } else { - 0 - } - }); + let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| { + self.chain + .data_availability_checker + .num_expected_blobs(&block_root) + }) else { + // Wait to download the block before downloading blobs. Then we can be sure that the + // block has data, so there's no need to do "blind" requests for all possible blobs and + // latter handle the case where if the peer sent no blobs, penalize. + // - if `downloaded_block_expected_blobs` is Some = block is downloading or processing. + // - if `num_expected_blobs` returns Some = block is processed. + return Ok(LookupRequestResult::Pending); + }; let imported_blob_indexes = self .chain @@ -554,13 +547,14 @@ impl SyncNetworkContext { pub fn on_single_block_response( &mut self, request_id: SingleLookupReqId, + peer_id: PeerId, block: RpcEvent>>, ) -> Option>>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { return None; }; - Some(match block { + let resp = match block { RpcEvent::Response(block, seen_timestamp) => { match request.get_mut().add_response(block) { Ok(block) => Ok((block, seen_timestamp)), @@ -579,43 +573,61 @@ impl SyncNetworkContext { request.remove(); Err(e.into()) } - }) + }; + + if let Err(LookupFailure::LookupVerifyError(e)) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); + } + Some(resp) } pub fn on_single_blob_response( &mut self, request_id: SingleLookupReqId, + peer_id: PeerId, blob: RpcEvent>>, ) -> Option>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { return None; }; - Some(match blob { - RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) { - Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) - .map(|blobs| (blobs, timestamp_now())) - .map_err(Into::into), - Ok(None) => return None, - Err(e) => { - request.remove(); - Err(e.into()) + let resp = match blob { + RpcEvent::Response(blob, seen_timestamp) => { + let request = request.get_mut(); + match request.add_response(blob) { + Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) + .map(|blobs| (blobs, seen_timestamp)) + .map_err(|e| (e.into(), request.resolve())), + Ok(None) => return None, + Err(e) => Err((e.into(), request.resolve())), } + } + RpcEvent::StreamTermination => match request.remove().terminate() { + Ok(_) => return None, + // (err, false = not resolved) because terminate returns Ok() if resolved + Err(e) => Err((e.into(), false)), }, - RpcEvent::StreamTermination => { - // Stream terminator - match request.remove().terminate() { - Some(blobs) => to_fixed_blob_sidecar_list(blobs) - .map(|blobs| (blobs, timestamp_now())) - .map_err(Into::into), - None => return None, + RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())), + }; + + match resp { + Ok(resp) => Some(Ok(resp)), + // Track if this request has already returned some value downstream. Ensure that + // downstream code only receives a single Result per request. If the serving peer does + // multiple penalizable actions per request, downscore and return None. This allows to + // catch if a peer is returning more blobs than requested or if the excess blobs are + // invalid. + Err((e, resolved)) => { + if let LookupFailure::LookupVerifyError(e) = &e { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); + } + if resolved { + None + } else { + Some(Err(e)) } } - RpcEvent::RPCError(e) => { - request.remove(); - Err(e.into()) - } - }) + } } pub fn send_block_for_processing( diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 0522b7fa384..cd73b4bebab 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -9,6 +9,7 @@ use types::{ #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { NoResponseReturned, + NotEnoughResponsesReturned { expected: usize, actual: usize }, TooManyResponses, UnrequestedBlockRoot(Hash256), UnrequestedBlobIndex(u64), @@ -139,11 +140,20 @@ impl ActiveBlobsByRootRequest { } } - pub fn terminate(self) -> Option>>> { + pub fn terminate(self) -> Result<(), LookupVerifyError> { if self.resolved { - None + Ok(()) } else { - Some(self.blobs) + Err(LookupVerifyError::NotEnoughResponsesReturned { + expected: self.request.indices.len(), + actual: self.blobs.len(), + }) } } + + /// Mark request as resolved (= has returned something downstream) while marking this status as + /// true for future calls. + pub fn resolve(&mut self) -> bool { + std::mem::replace(&mut self.resolved, true) + } }