Skip to content

Commit

Permalink
Pr 5583 review (#24)
Browse files Browse the repository at this point in the history
* add bad state warn log

* add rust docs to new fields in `SyncNetworkContext`

* remove timestamp todo

* add back lookup verify error

* remove TODOs
  • Loading branch information
realbigsean authored Apr 22, 2024
1 parent e30c5b0 commit 2d4712b
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 67 deletions.
50 changes: 20 additions & 30 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use self::single_block_lookup::SingleBlockLookup;
use super::manager::BlockProcessingResult;
use super::network_context::{LookupFailure, LookupVerifyError};
use super::BatchProcessResult;
use super::{manager::BlockProcessType, network_context::SyncNetworkContext};
use crate::metrics;
Expand All @@ -20,7 +21,6 @@ pub use common::Lookup;
pub use common::Parent;
pub use common::RequestState;
use fnv::FnvHashMap;
use lighthouse_network::rpc::RPCError;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState};
Expand Down Expand Up @@ -604,7 +604,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// processed. Drop the request without extra penalty
}
RequestError::BadState(..) => {
// Internal error should never happen
warn!(self.log, "Failed to request parent"; "error" => e.as_static());
}
}
}
Expand Down Expand Up @@ -638,10 +638,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
id: SingleLookupReqId,
peer_id: &PeerId,
cx: &mut SyncNetworkContext<T>,
error: RPCError,
error: LookupFailure,
) {
// Downscore peer even if lookup is not known
self.downscore_on_rpc_error(peer_id, &error, cx);
// Only downscore lookup verify errors. RPC errors are downscored in the network handler.
if let LookupFailure::LookupVerifyError(e) = &error {
// Downscore peer even if lookup is not known
self.downscore_on_rpc_error(peer_id, e, cx);
}

let Some(mut parent_lookup) = self.get_parent_lookup::<R>(id) else {
debug!(self.log,
Expand Down Expand Up @@ -671,14 +674,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
id: SingleLookupReqId,
peer_id: &PeerId,
cx: &mut SyncNetworkContext<T>,
error: RPCError,
error: LookupFailure,
) {
// Downscore peer even if lookup is not known
self.downscore_on_rpc_error(peer_id, &error, cx);
// Only downscore lookup verify errors. RPC errors are downscored in the network handler.
if let LookupFailure::LookupVerifyError(e) = &error {
// Downscore peer even if lookup is not known
self.downscore_on_rpc_error(peer_id, e, cx);
}

let log = self.log.clone();
let Some(mut lookup) = self.get_single_lookup::<R>(id) else {
debug!(log, "Error response to dropped lookup"; "error" => ?error);
debug!(log, "Error response to dropped lookup"; "error" => %error);
return;
};
let block_root = lookup.block_root();
Expand Down Expand Up @@ -1322,31 +1328,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn downscore_on_rpc_error(
&self,
peer_id: &PeerId,
error: &RPCError,
error: &LookupVerifyError,
cx: &SyncNetworkContext<T>,
) {
// Note: logging the report event here with the full error display. The log inside
// `report_peer` only includes a smaller string, like "invalid_data"
debug!(self.log, "reporting peer for sync lookup error"; "error" => %error);
if let Some(action) = match error {
// Protocol errors are heavily penalized
RPCError::SSZDecodeError(..)
| RPCError::IoError(..)
| RPCError::ErrorResponse(..)
| RPCError::InvalidData(..)
| RPCError::HandlerRejected => Some(PeerAction::LowToleranceError),
// Timing / network errors are less penalized
// TODO: Is IoError a protocol error or network error?
RPCError::StreamTimeout | RPCError::IncompleteStream | RPCError::NegotiationTimeout => {
Some(PeerAction::MidToleranceError)
}
// Not supporting a specific protocol is tolerated. TODO: Are you sure?
RPCError::UnsupportedProtocol => None,
// Our fault, don't penalize peer
RPCError::InternalError(..) | RPCError::Disconnected => None,
} {
cx.report_peer(*peer_id, action, error.into());
}
let error_str: &'static str = error.into();

debug!(self.log, "reporting peer for sync lookup error"; "error" => error_str);
cx.report_peer(*peer_id, PeerAction::LowToleranceError, error_str);
}

pub fn update_metrics(&self) {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use beacon_chain::test_utils::{
build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs,
};
use beacon_processor::WorkEvent;
use lighthouse_network::rpc::RPCResponseErrorCode;
use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode};
use lighthouse_network::types::SyncState;
use lighthouse_network::{NetworkGlobals, Request};
use slog::info;
Expand Down
57 changes: 40 additions & 17 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
pub use requests::LookupVerifyError;
use slog::{debug, trace, warn};
use std::collections::hash_map::Entry;
use std::sync::Arc;
Expand Down Expand Up @@ -51,7 +52,33 @@ pub enum RpcEvent<T> {
RPCError(RPCError),
}

pub type RpcProcessingResult<T> = Option<Result<(T, Duration), RPCError>>;
pub type RpcProcessingResult<T> = Option<Result<(T, Duration), LookupFailure>>;

pub enum LookupFailure {
RpcError(RPCError),
LookupVerifyError(LookupVerifyError),
}

impl std::fmt::Display for LookupFailure {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
LookupFailure::RpcError(e) => write!(f, "RPC Error: {:?}", e),
LookupFailure::LookupVerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e),
}
}
}

impl From<RPCError> for LookupFailure {
fn from(e: RPCError) -> Self {
LookupFailure::RpcError(e)
}
}

impl From<LookupVerifyError> for LookupFailure {
fn from(e: LookupVerifyError) -> Self {
LookupFailure::LookupVerifyError(e)
}
}

/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id.
pub struct SyncNetworkContext<T: BeaconChainTypes> {
Expand All @@ -61,7 +88,10 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A sequential ID for all RPC requests.
request_id: Id,

/// A mapping of active BlocksByRoot requests, including both current slot and parent lookups.
blocks_by_root_requests: FnvHashMap<SingleLookupReqId, ActiveBlocksByRootRequest>,

/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
blobs_by_root_requests: FnvHashMap<SingleLookupReqId, ActiveBlobsByRootRequest<T::EthSpec>>,

/// BlocksByRange requests paired with BlobsByRange
Expand Down Expand Up @@ -439,20 +469,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(block) => Ok((block, seen_timestamp)),
Err(e) => {
// The request must be dropped after receiving an error.
// TODO: We could NOT drop the request here, and penalize the peer again if
// sends multiple penalizable chunks after the first invalid.
request.remove();
Err(e)
Err(e.into())
}
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
Err(e) => Err(e),
Err(e) => Err(e.into()),
},
RpcEvent::RPCError(e) => {
request.remove();
Err(e)
Err(e.into())
}
})
}
Expand All @@ -468,46 +496,41 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

Some(match blob {
RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) {
// TODO: Should deal only with Vec<Arc<BlobSidecar>>
Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs)
.map(|blobs| (blobs, timestamp_now()))
.map_err(RPCError::InvalidData),
.map_err(Into::into),
Ok(None) => return None,
Err(e) => {
request.remove();
Err(e)
Err(e.into())
}
},
RpcEvent::StreamTermination => {
// Stream terminator
match request.remove().terminate() {
// TODO: Should deal only with Vec<Arc<BlobSidecar>>
Some(blobs) => to_fixed_blob_sidecar_list(blobs)
// TODO: a seen_timestamp for an array of blobs doesn't make much sense
// since each is received at different times. Should we track first, last or
// average?
.map(|blobs| (blobs, timestamp_now()))
.map_err(RPCError::InvalidData),
.map_err(Into::into),
None => return None,
}
}
RpcEvent::RPCError(e) => {
request.remove();
Err(e)
Err(e.into())
}
})
}
}

fn to_fixed_blob_sidecar_list<E: EthSpec>(
blobs: Vec<Arc<BlobSidecar<E>>>,
) -> Result<FixedBlobSidecarList<E>, String> {
) -> Result<FixedBlobSidecarList<E>, LookupVerifyError> {
let mut fixed_list = FixedBlobSidecarList::default();
for blob in blobs.into_iter() {
let index = blob.index as usize;
*fixed_list
.get_mut(index)
.ok_or("invalid index".to_string())? = Some(blob)
.ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob)
}
Ok(fixed_list)
}
42 changes: 23 additions & 19 deletions beacon_node/network/src/sync/network_context/requests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
use beacon_chain::get_block_root;
use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest, RPCError};
use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest};
use std::sync::Arc;
use strum::IntoStaticStr;
use types::{
blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock,
};

#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupVerifyError {
NoResponseReturned,
TooManyResponses,
UnrequestedBlockRoot(Hash256),
UnrequestedBlobIndex(u64),
InvalidInclusionProof,
DuplicateData,
}

pub struct ActiveBlocksByRootRequest {
request: BlocksByRootSingleRequest,
resolved: bool,
Expand All @@ -24,28 +35,26 @@ impl ActiveBlocksByRootRequest {
pub fn add_response<E: EthSpec>(
&mut self,
block: Arc<SignedBeaconBlock<E>>,
) -> Result<Arc<SignedBeaconBlock<E>>, RPCError> {
) -> Result<Arc<SignedBeaconBlock<E>>, LookupVerifyError> {
if self.resolved {
return Err(RPCError::InvalidData("too many responses".to_string()));
return Err(LookupVerifyError::TooManyResponses);
}

let block_root = get_block_root(&block);
if self.request.0 != block_root {
return Err(RPCError::InvalidData(format!(
"un-requested block root {block_root:?}"
)));
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}

// Valid data, blocks by root expects a single response
self.resolved = true;
Ok(block)
}

pub fn terminate(self) -> Result<(), RPCError> {
pub fn terminate(self) -> Result<(), LookupVerifyError> {
if self.resolved {
Ok(())
} else {
Err(RPCError::InvalidData("no response returned".to_string()))
Err(LookupVerifyError::NoResponseReturned)
}
}
}
Expand Down Expand Up @@ -101,28 +110,23 @@ impl<E: EthSpec> ActiveBlobsByRootRequest<E> {
pub fn add_response(
&mut self,
blob: Arc<BlobSidecar<E>>,
) -> Result<Option<Vec<Arc<BlobSidecar<E>>>>, RPCError> {
) -> Result<Option<Vec<Arc<BlobSidecar<E>>>>, LookupVerifyError> {
if self.resolved {
return Err(RPCError::InvalidData("too many responses".to_string()));
return Err(LookupVerifyError::TooManyResponses);
}

let block_root = blob.block_root();
if self.request.block_root != block_root {
return Err(RPCError::InvalidData(format!(
"un-requested block root {block_root:?}"
)));
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}
if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) {
return Err(RPCError::InvalidData("invalid inclusion proof".to_string()));
return Err(LookupVerifyError::InvalidInclusionProof);
}
if !self.request.indices.contains(&blob.index) {
return Err(RPCError::InvalidData(format!(
"un-requested blob index {}",
blob.index
)));
return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index));
}
if self.blobs.iter().any(|b| b.index == blob.index) {
return Err(RPCError::InvalidData("duplicated data".to_string()));
return Err(LookupVerifyError::DuplicateData);
}

self.blobs.push(blob);
Expand Down

0 comments on commit 2d4712b

Please sign in to comment.