Skip to content

Commit

Permalink
Make SingleLookupRequestState fields private (#5563)
Browse files Browse the repository at this point in the history
* Make SingleLookupRequestState fields private

* Fix tests
  • Loading branch information
dapplion authored Apr 15, 2024
1 parent b6a1c86 commit fee2ee9
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 158 deletions.
98 changes: 41 additions & 57 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use beacon_chain::data_availability_checker::ChildComponents;
use beacon_chain::{get_block_root, BeaconChainTypes};
use lighthouse_network::rpc::methods::BlobsByRootRequest;
use lighthouse_network::rpc::BlocksByRootRequest;
use rand::prelude::IteratorRandom;
use std::ops::IndexMut;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -104,21 +103,20 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// Check if request is necessary.
if !matches!(self.get_state().state, State::AwaitingDownload) {
if !self.get_state().is_awaiting_download() {
return Ok(());
}

// Construct request.
let (peer_id, request) = self.build_request(&cx.chain.spec)?;

// Update request state.
self.get_state_mut().state = State::Downloading { peer_id };
self.get_state_mut().req_counter += 1;
let req_counter = self.get_state_mut().on_download_start(peer_id);

// Make request
let id = SingleLookupReqId {
id,
req_counter: self.get_state().req_counter,
req_counter,
lookup_type: L::lookup_type(),
};
Self::make_request(id, peer_id, request, cx)
Expand All @@ -130,8 +128,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
let request_state = self.get_state();

if request_state.failed_attempts() >= max_attempts {
let cannot_process =
request_state.failed_processing >= request_state.failed_downloading;
let cannot_process = request_state.more_failed_processing_attempts();
Err(LookupRequestError::TooManyAttempts { cannot_process })
} else {
Ok(())
Expand All @@ -141,15 +138,9 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
/// Get the next peer to request. Draws from the set of peers we think should have both the
/// block and blob first. If that fails, we draw from the set of peers that may have either.
fn get_peer(&mut self) -> Result<PeerId, LookupRequestError> {
let request_state = self.get_state_mut();
let peer_id = request_state
.available_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()
.ok_or(LookupRequestError::NoPeers)?;
request_state.used_peers.insert(peer_id);
Ok(peer_id)
self.get_state_mut()
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)
}

/// Initialize `Self::RequestType`.
Expand All @@ -169,29 +160,35 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
fn verify_response(
&mut self,
expected_block_root: Hash256,
peer_id: PeerId,
response: Option<Self::ResponseType>,
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError> {
let request_state = self.get_state_mut();
match request_state.state {
State::AwaitingDownload => {
request_state.register_failure_downloading();
Err(LookupVerifyError::ExtraBlocksReturned)
}
State::Downloading { peer_id } => {
self.verify_response_inner(expected_block_root, response, peer_id)
let result = match *self.get_state().get_state() {
State::AwaitingDownload => Err(LookupVerifyError::ExtraBlocksReturned),
State::Downloading { peer_id: _ } => {
// TODO: We requested a download from Downloading { peer_id }, but the network
// injects a response from a different peer_id. What should we do? The peer_id to
// track for scoring is the one that actually sent the response, not the state's
self.verify_response_inner(expected_block_root, response)
}
State::Processing { peer_id: _ } => match response {
Some(_) => {
// We sent the block for processing and received an extra block.
request_state.register_failure_downloading();
Err(LookupVerifyError::ExtraBlocksReturned)
}
None => {
// This is simply the stream termination and we are already processing the
// block
Ok(None)
}
State::Processing { .. } | State::Processed { .. } => match response {
// We sent the block for processing and received an extra block.
Some(_) => Err(LookupVerifyError::ExtraBlocksReturned),
// This is simply the stream termination and we are already processing the block
None => Ok(None),
},
};

match result {
Ok(Some(response)) => {
self.get_state_mut().on_download_success(peer_id);
Ok(Some(response))
}
Ok(None) => Ok(None),
Err(e) => {
self.get_state_mut().on_download_failure();
Err(e)
}
}
}

Expand All @@ -200,7 +197,6 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
&mut self,
expected_block_root: Hash256,
response: Option<Self::ResponseType>,
peer_id: PeerId,
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError>;

/// A getter for the parent root of the response. Returns an `Option` because we won't know
Expand Down Expand Up @@ -232,7 +228,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {

/// Register a failure to process the block or blob.
fn register_failure_downloading(&mut self) {
self.get_state_mut().register_failure_downloading()
self.get_state_mut().on_download_failure()
}

/* Utility methods */
Expand Down Expand Up @@ -274,7 +270,6 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
&mut self,
expected_block_root: Hash256,
response: Option<Self::ResponseType>,
peer_id: PeerId,
) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, LookupVerifyError> {
match response {
Some(block) => {
Expand All @@ -285,18 +280,13 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
// return an error and drop the block
// NOTE: we take this is as a download failure to prevent counting the
// attempt as a chain failure, but simply a peer failure.
self.state.register_failure_downloading();
Err(LookupVerifyError::RootMismatch)
} else {
// Return the block for processing.
self.state.state = State::Processing { peer_id };
Ok(Some(block))
}
}
None => {
self.state.register_failure_downloading();
Err(LookupVerifyError::NoBlockReturned)
}
None => Err(LookupVerifyError::NoBlockReturned),
}
}

Expand Down Expand Up @@ -374,25 +364,20 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
&mut self,
expected_block_root: Hash256,
blob: Option<Self::ResponseType>,
peer_id: PeerId,
) -> Result<Option<FixedBlobSidecarList<T::EthSpec>>, LookupVerifyError> {
match blob {
Some(blob) => {
let received_id = blob.id();

if !self.requested_ids.contains(&received_id) {
Err(LookupVerifyError::UnrequestedBlobId(received_id))
} else if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) {
Err(LookupVerifyError::InvalidInclusionProof)
} else if blob.block_root() != expected_block_root {
Err(LookupVerifyError::UnrequestedHeader)
} else {
Ok(())
return Err(LookupVerifyError::UnrequestedBlobId(received_id));
}
if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) {
return Err(LookupVerifyError::InvalidInclusionProof);
}
if blob.block_root() != expected_block_root {
return Err(LookupVerifyError::UnrequestedHeader);
}
.map_err(|e| {
self.state.register_failure_downloading();
e
})?;

// State should remain downloading until we receive the stream terminator.
self.requested_ids.remove(&received_id);
Expand All @@ -403,7 +388,6 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
Ok(None)
}
None => {
self.state.state = State::Processing { peer_id };
let blobs = std::mem::take(&mut self.blob_download_queue);
Ok(Some(blobs))
}
Expand Down
74 changes: 34 additions & 40 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut lookup = self.single_block_lookups.remove(&id.id)?;

let request_state = R::request_state_mut(&mut lookup);
if id.req_counter != request_state.get_state().req_counter {
if request_state
.get_state()
.is_current_req_counter(id.req_counter)
{
Some(lookup)
} else {
// We don't want to drop the lookup, just ignore the old response.
self.single_block_lookups.insert(id.id, lookup);
return None;
None
}
Some(lookup)
}

/// Checks whether a single block lookup is waiting for a parent lookup to complete. This is
Expand Down Expand Up @@ -374,7 +378,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let expected_block_root = lookup.block_root();
let request_state = R::request_state_mut(&mut lookup);

match request_state.verify_response(expected_block_root, response) {
match request_state.verify_response(expected_block_root, peer_id, response) {
Ok(Some(verified_response)) => {
self.handle_verified_response::<Current, R>(
seen_timestamp,
Expand Down Expand Up @@ -415,10 +419,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let id = lookup.id;
let block_root = lookup.block_root();

R::request_state_mut(lookup)
.get_state_mut()
.component_downloaded = true;

let cached_child = lookup.add_response::<R>(verified_response.clone());
match cached_child {
CachedChild::Ok(block) => {
Expand Down Expand Up @@ -447,10 +447,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// initial request.
if lookup.both_components_downloaded() {
lookup.penalize_blob_peer(cx);
lookup
.blob_request_state
.state
.register_failure_downloading();
lookup.blob_request_state.state.on_download_failure();
}
lookup.request_block_and_blobs(cx)?;
}
Expand Down Expand Up @@ -493,13 +490,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

if R::request_state_mut(&mut parent_lookup.current_parent_request)
.get_state()
.req_counter
!= id.req_counter
.is_current_req_counter(id.req_counter)
{
Some(parent_lookup)
} else {
self.parent_lookups.push(parent_lookup);
return None;
None
}
Some(parent_lookup)
}

/// Process a response received from a parent lookup request.
Expand Down Expand Up @@ -559,7 +556,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &SyncNetworkContext<T>,
parent_lookup: &mut ParentLookup<T>,
) -> Result<(), RequestError> {
match parent_lookup.verify_response::<R>(response, &mut self.failed_chains) {
match parent_lookup.verify_response::<R>(peer_id, response, &mut self.failed_chains) {
Ok(Some(verified_response)) => {
self.handle_verified_response::<Parent, R>(
seen_timestamp,
Expand Down Expand Up @@ -640,7 +637,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
RequestError::ChainTooLong => {
self.failed_chains.insert(parent_lookup.chain_hash());
// This indicates faulty peers.
for &peer_id in parent_lookup.used_peers() {
for &peer_id in parent_lookup.all_used_peers() {
cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static())
}
}
Expand All @@ -653,24 +650,30 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.failed_chains.insert(parent_lookup.chain_hash());
}
// This indicates faulty peers.
for &peer_id in parent_lookup.used_peers() {
for &peer_id in parent_lookup.all_used_peers() {
cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static())
}
}
RequestError::NoPeers => {
// This happens if the peer disconnects while the block is being
// processed. Drop the request without extra penalty
}
RequestError::BadState(_) => {
// Should never happen
}
}
}

/* Error responses */

pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext<T>) {
/* Check disconnection for single lookups */
self.single_block_lookups.retain(|_, req| {
self.single_block_lookups.retain(|id, req| {
let should_drop_lookup =
req.should_drop_lookup_on_disconnected_peer(peer_id, cx, &self.log);
if should_drop_lookup {
debug!(self.log, "Dropping lookup after peer disconnected"; "id" => id, "block_root" => %req.block_root());
}

!should_drop_lookup
});
Expand Down Expand Up @@ -844,15 +847,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
) -> Result<(), LookupRequestError> {
let request_state = R::request_state_mut(lookup);

request_state.get_state_mut().component_processed = true;
request_state
.get_state_mut()
.on_processing_success()
.map_err(LookupRequestError::BadState)?;
if lookup.both_components_processed() {
lookup.penalize_blob_peer(cx);

// Try it again if possible.
lookup
.blob_request_state
.state
.register_failure_processing();
lookup.blob_request_state.state.on_processing_failure();
lookup.request_block_and_blobs(cx)?;
}
Ok(())
Expand Down Expand Up @@ -900,14 +903,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockError::AvailabilityCheck(e) => match e.category() {
AvailabilityCheckErrorCategory::Internal => {
warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup
.block_request_state
.state
.register_failure_downloading();
lookup
.blob_request_state
.state
.register_failure_downloading();
lookup.block_request_state.state.on_download_failure();
lookup.blob_request_state.state.on_download_failure();
lookup.request_block_and_blobs(cx)?
}
AvailabilityCheckErrorCategory::Malicious => {
Expand All @@ -926,10 +923,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);

// Try it again if possible.
lookup
.block_request_state
.state
.register_failure_processing();
lookup.block_request_state.state.on_processing_failure();
lookup.request_block_and_blobs(cx)?
}
}
Expand Down Expand Up @@ -999,7 +993,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.current_parent_request
.blob_request_state
.state
.register_failure_processing();
.on_processing_failure();
match parent_lookup
.current_parent_request
.request_block_and_blobs(cx)
Expand Down Expand Up @@ -1255,8 +1249,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
penalty,
} => {
self.failed_chains.insert(chain_hash);
for peer_source in request.all_peers() {
cx.report_peer(peer_source, penalty, "parent_chain_failure")
for peer_source in request.all_used_peers() {
cx.report_peer(*peer_source, penalty, "parent_chain_failure")
}
}
BatchProcessResult::NonFaultyFailure => {
Expand Down
Loading

0 comments on commit fee2ee9

Please sign in to comment.