Skip to content

Commit

Permalink
Nest lookup type into request id SingleBlock and SingleBlob (#5562)
Browse files Browse the repository at this point in the history
* Nest lookup type into block lookup RequestId
  • Loading branch information
dapplion authored Apr 11, 2024
1 parent 3d4e6e2 commit 34dbb32
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 119 deletions.
13 changes: 5 additions & 8 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,7 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. }
| SyncId::SingleBlob { .. }
| SyncId::ParentLookup { .. }
| SyncId::ParentLookupBlob { .. } => {
SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } => {
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
return;
}
Expand Down Expand Up @@ -561,15 +558,15 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
id @ SyncId::SingleBlock { .. } => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. } => {
SyncId::SingleBlob { .. } => {
crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id);
return;
}
Expand Down Expand Up @@ -602,8 +599,8 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. }) => id,
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
id @ SyncId::SingleBlob { .. } => id,
SyncId::SingleBlock { .. } => {
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
return;
}
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum ResponseType {
Blob,
}

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum LookupType {
Current,
Parent,
Expand Down Expand Up @@ -119,6 +119,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
let id = SingleLookupReqId {
id,
req_counter: self.get_state().req_counter,
lookup_type: L::lookup_type(),
};
Self::make_request(id, peer_id, request, cx)
}
Expand Down Expand Up @@ -265,7 +266,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.block_lookup_request(id, peer_id, request, L::lookup_type())
cx.block_lookup_request(id, peer_id, request)
.map_err(LookupRequestError::SendFailed)
}

Expand Down Expand Up @@ -365,7 +366,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.blob_lookup_request(id, peer_id, request, L::lookup_type())
cx.blob_lookup_request(id, peer_id, request)
.map_err(LookupRequestError::SendFailed)
}

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn parent_lookup_failed<R: RequestState<Parent, T>>(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
peer_id: &PeerId,
cx: &SyncNetworkContext<T>,
error: RPCError,
) {
Expand Down
48 changes: 29 additions & 19 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl TestRig {
beacon_block: Option<Arc<SignedBeaconBlock<E>>>,
) {
self.send_sync_message(SyncMessage::RpcBlock {
request_id: SyncRequestId::ParentLookup { id },
request_id: SyncRequestId::SingleBlock { id },
peer_id,
beacon_block,
seen_timestamp: D,
Expand Down Expand Up @@ -324,7 +324,7 @@ impl TestRig {
blob_sidecar: Option<Arc<BlobSidecar<E>>>,
) {
self.send_sync_message(SyncMessage::RpcBlob {
request_id: SyncRequestId::ParentLookupBlob { id },
request_id: SyncRequestId::SingleBlob { id },
peer_id,
blob_sidecar,
seen_timestamp: D,
Expand All @@ -348,7 +348,7 @@ impl TestRig {
fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) {
self.send_sync_message(SyncMessage::RpcError {
peer_id,
request_id: SyncRequestId::ParentLookup { id },
request_id: SyncRequestId::SingleBlock { id },
error,
})
}
Expand Down Expand Up @@ -409,7 +409,11 @@ impl TestRig {
peer_id: _,
request: Request::BlocksByRoot(request),
request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }),
} if request.block_roots().to_vec().contains(&for_block) => Some(*id),
} if id.lookup_type == LookupType::Current
&& request.block_roots().to_vec().contains(&for_block) =>
{
Some(*id)
}
_ => None,
})
.unwrap_or_else(|e| panic!("Expected block request for {for_block:?}: {e}"))
Expand All @@ -422,11 +426,12 @@ impl TestRig {
peer_id: _,
request: Request::BlobsByRoot(request),
request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }),
} if request
.blob_ids
.to_vec()
.iter()
.any(|r| r.block_root == for_block) =>
} if id.lookup_type == LookupType::Current
&& request
.blob_ids
.to_vec()
.iter()
.any(|r| r.block_root == for_block) =>
{
Some(*id)
}
Expand All @@ -441,8 +446,12 @@ impl TestRig {
NetworkMessage::SendRequest {
peer_id: _,
request: Request::BlocksByRoot(request),
request_id: RequestId::Sync(SyncRequestId::ParentLookup { id }),
} if request.block_roots().to_vec().contains(&for_block) => Some(*id),
request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }),
} if id.lookup_type == LookupType::Parent
&& request.block_roots().to_vec().contains(&for_block) =>
{
Some(*id)
}
_ => None,
})
.unwrap_or_else(|e| panic!("Expected block parent request for {for_block:?}: {e}"))
Expand All @@ -454,12 +463,13 @@ impl TestRig {
NetworkMessage::SendRequest {
peer_id: _,
request: Request::BlobsByRoot(request),
request_id: RequestId::Sync(SyncRequestId::ParentLookupBlob { id }),
} if request
.blob_ids
.to_vec()
.iter()
.all(|r| r.block_root == for_block) =>
request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }),
} if id.lookup_type == LookupType::Parent
&& request
.blob_ids
.to_vec()
.iter()
.all(|r| r.block_root == for_block) =>
{
Some(*id)
}
Expand Down Expand Up @@ -1974,15 +1984,15 @@ mod deneb_only {
return;
};
let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(2));
let parent_root = block.parent_root();
let block_root = block.canonical_root();
let blob_0 = blobs[0].clone();
let blob_1 = blobs[1].clone();
let peer_a = r.new_connected_peer();
let peer_b = r.new_connected_peer();
// Send unknown parent block lookup
r.trigger_unknown_parent_block(peer_a, block.into());
// Expect network request for blobs
let id = r.expect_blob_parent_request(parent_root);
let id = r.expect_blob_lookup_request(block_root);
// Peer responses with blob 0
r.single_lookup_blob_response(id, peer_a, Some(blob_0.into()));
// Blob 1 is received via gossip unknown parent blob from a different peer
Expand Down
131 changes: 62 additions & 69 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
//! search for the block and subsequently search for parents if needed.

use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::common::LookupType;
use super::block_lookups::BlockLookups;
use super::network_context::{BlockOrBlob, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
Expand Down Expand Up @@ -80,6 +81,7 @@ pub type Id = u32;
pub struct SingleLookupReqId {
pub id: Id,
pub req_counter: Id,
pub lookup_type: LookupType,
}

/// Id of rpc requests sent by sync to the network.
Expand All @@ -89,12 +91,6 @@ pub enum RequestId {
SingleBlock { id: SingleLookupReqId },
/// Request searching for a set of blobs given a hash.
SingleBlob { id: SingleLookupReqId },
/// Request searching for a block's parent. The id is the chain, share with the corresponding
/// blob id.
ParentLookup { id: SingleLookupReqId },
/// Request searching for a block's parent blobs. The id is the chain, shared with the corresponding
/// block id.
ParentLookupBlob { id: SingleLookupReqId },
/// Request was from the backfill sync algorithm.
BackFillBlocks { id: Id },
/// Backfill request that is composed by both a block range request and a blob range request.
Expand Down Expand Up @@ -331,42 +327,42 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
trace!(self.log, "Sync manager received a failed RPC");
match request_id {
RequestId::SingleBlock { id } => {
self.block_lookups
RequestId::SingleBlock { id } => match id.lookup_type {
LookupType::Current => self
.block_lookups
.single_block_lookup_failed::<BlockRequestState<Current>>(
id,
&peer_id,
&self.network,
error,
);
}
RequestId::SingleBlob { id } => {
self.block_lookups
.single_block_lookup_failed::<BlobRequestState<Current, T::EthSpec>>(
),
LookupType::Parent => self
.block_lookups
.parent_lookup_failed::<BlockRequestState<Parent>>(
id,
&peer_id,
&self.network,
error,
);
}
RequestId::ParentLookup { id } => {
self.block_lookups
.parent_lookup_failed::<BlockRequestState<Parent>>(
),
},
RequestId::SingleBlob { id } => match id.lookup_type {
LookupType::Current => self
.block_lookups
.single_block_lookup_failed::<BlobRequestState<Current, T::EthSpec>>(
id,
peer_id,
&peer_id,
&self.network,
error,
);
}
RequestId::ParentLookupBlob { id } => {
self.block_lookups
),
LookupType::Parent => self
.block_lookups
.parent_lookup_failed::<BlobRequestState<Parent, T::EthSpec>>(
id,
peer_id,
&peer_id,
&self.network,
error,
);
}
),
},
RequestId::BackFillBlocks { id } => {
if let Some(batch_id) = self
.network
Expand Down Expand Up @@ -882,30 +878,29 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { id } => self
.block_lookups
.single_lookup_response::<BlockRequestState<Current>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
RequestId::SingleBlock { id } => match id.lookup_type {
LookupType::Current => self
.block_lookups
.single_lookup_response::<BlockRequestState<Current>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
LookupType::Parent => self
.block_lookups
.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
},
RequestId::SingleBlob { .. } => {
crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id );
}
RequestId::ParentLookup { id } => self
.block_lookups
.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
RequestId::ParentLookupBlob { id: _ } => {
crit!(self.log, "Block received during parent blob request"; "peer_id" => %peer_id );
}
RequestId::BackFillBlocks { id } => {
let is_stream_terminator = block.is_none();
if let Some(batch_id) = self
Expand Down Expand Up @@ -966,28 +961,26 @@ impl<T: BeaconChainTypes> SyncManager<T> {
RequestId::SingleBlock { .. } => {
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
}
RequestId::SingleBlob { id } => self
.block_lookups
.single_lookup_response::<BlobRequestState<Current, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&self.network,
),

RequestId::ParentLookup { id: _ } => {
crit!(self.log, "Single blob received during parent block request"; "peer_id" => %peer_id );
}
RequestId::ParentLookupBlob { id } => self
.block_lookups
.parent_lookup_response::<BlobRequestState<Parent, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&self.network,
),
RequestId::SingleBlob { id } => match id.lookup_type {
LookupType::Current => self
.block_lookups
.single_lookup_response::<BlobRequestState<Current, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&self.network,
),
LookupType::Parent => self
.block_lookups
.parent_lookup_response::<BlobRequestState<Parent, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&self.network,
),
},
RequestId::BackFillBlocks { id: _ } => {
crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id );
}
Expand Down
Loading

0 comments on commit 34dbb32

Please sign in to comment.