Skip to content

Commit

Permalink
Sync lookup dedup range and blobs (#5561)
Browse files Browse the repository at this point in the history
* Handle sync range blocks as blocks and blobs

* Merge range sync and backfill sync handling

* Update tests

* Add no_blobs_into_responses test

* Address @realbigsean comments

* Merge remote-tracking branch 'origin/unstable' into sync-lookup-dedup-range-and-blobs
  • Loading branch information
dapplion authored Apr 12, 2024
1 parent 5fdd3b3 commit 6fb0b2e
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 531 deletions.
15 changes: 3 additions & 12 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,7 @@ impl<T: BeaconChainTypes> Router<T> {
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
return;
}
id @ (SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::BackFillBlockAndBlobs { .. }
| SyncId::RangeBlockAndBlobs { .. }) => id,
id @ SyncId::RangeBlockAndBlobs { .. } => id,
},
RequestId::Router => {
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
Expand Down Expand Up @@ -559,10 +556,7 @@ impl<T: BeaconChainTypes> Router<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlock { .. } => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => {
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
Expand Down Expand Up @@ -604,10 +598,7 @@ impl<T: BeaconChainTypes> Router<T> {
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
return;
}
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => {
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
Expand Down
8 changes: 7 additions & 1 deletion beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::manager::{BatchProcessResult, Id};
use crate::sync::network_context::RangeRequestId;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::range_sync::{
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
Expand Down Expand Up @@ -961,7 +962,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, is_blob_batch) = batch.to_blocks_by_range_request();
match network.backfill_blocks_by_range_request(peer, is_blob_batch, request, batch_id) {
match network.blocks_and_blobs_by_range_request(
peer,
is_blob_batch,
request,
RangeRequestId::BackfillSync { batch_id },
) {
Ok(request_id) => {
// inform the batch about the new request
if let Err(e) = batch.start_downloading_from_peer(peer, request_id) {
Expand Down
54 changes: 52 additions & 2 deletions beacon_node/network/src/sync/block_sidecar_coupling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use ssz_types::VariableList;
use std::{collections::VecDeque, sync::Arc};
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};

#[derive(Debug, Default)]
use super::range_sync::ByRangeRequestType;

#[derive(Debug)]
pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar.
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<E>>>,
Expand All @@ -13,9 +15,25 @@ pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
is_blocks_stream_terminated: bool,
/// Whether the individual RPC request for sidecars is finished or not.
is_sidecars_stream_terminated: bool,
/// Used to determine if this accumulator should wait for a sidecars stream termination
request_type: ByRangeRequestType,
}

impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
pub fn new(request_type: ByRangeRequestType) -> Self {
Self {
accumulated_blocks: <_>::default(),
accumulated_sidecars: <_>::default(),
is_blocks_stream_terminated: <_>::default(),
is_sidecars_stream_terminated: <_>::default(),
request_type,
}
}

pub fn get_request_type(&self) -> ByRangeRequestType {
self.request_type
}

pub fn add_block_response(&mut self, block_opt: Option<Arc<SignedBeaconBlock<E>>>) {
match block_opt {
Some(block) => self.accumulated_blocks.push_back(block),
Expand Down Expand Up @@ -78,6 +96,38 @@ impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
}

pub fn is_finished(&self) -> bool {
self.is_blocks_stream_terminated && self.is_sidecars_stream_terminated
let blobs_requested = match self.request_type {
ByRangeRequestType::Blocks => false,
ByRangeRequestType::BlocksAndBlobs => true,
};
self.is_blocks_stream_terminated && (!blobs_requested || self.is_sidecars_stream_terminated)
}
}

#[cfg(test)]
mod tests {
use super::BlocksAndBlobsRequestInfo;
use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs};
use rand::SeedableRng;
use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E};

#[test]
fn no_blobs_into_responses() {
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::Blocks);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng).0)
.collect::<Vec<_>>();

// Send blocks and complete terminate response
for block in blocks {
info.add_block_response(Some(block.into()));
}
info.add_block_response(None);

// Assert response is finished and RpcBlocks can be constructed
assert!(info.is_finished());
info.into_responses().unwrap();
}
}
Loading

0 comments on commit 6fb0b2e

Please sign in to comment.