Skip to content

Commit

Permalink
Downscore timed-out peers when syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
Tumas committed Jul 15, 2024
1 parent be97b0e commit 1148eee
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ slog = { workspace = true }
slog-stdlog = { workspace = true }
ssz = { workspace = true }
std_ext = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
13 changes: 11 additions & 2 deletions p2p/src/block_sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use anyhow::Result;
use database::Database;
use eth1_api::RealController;
use eth2_libp2p::{rpc::StatusMessage, PeerId};
use eth2_libp2p::{rpc::StatusMessage, PeerAction, PeerId, ReportSource};
use fork_choice_control::SyncMessage;
use futures::{
channel::mpsc::{UnboundedReceiver, UnboundedSender},
Expand All @@ -29,7 +29,7 @@ use types::{
use crate::{
back_sync::{BackSync, Data as BackSyncData, Error as BackSyncError, SyncCheckpoint},
messages::{ArchiverToSync, P2pToSync, SyncToApi, SyncToMetrics, SyncToP2p},
misc::RequestId,
misc::{PeerReportReason, RequestId},
sync_manager::{SyncBatch, SyncManager, SyncTarget},
};

Expand Down Expand Up @@ -354,11 +354,20 @@ impl<P: Preset> BlockSyncService<P> {
let request_id = self.request_id()?;
let SyncBatch {
target,
peer_id,
start_slot,
count,
..
} = batch;

SyncToP2p::ReportPeer(
peer_id,
PeerAction::MidToleranceError,
ReportSource::SyncService,
PeerReportReason::ExpiredSyncBatch,
)
.send(&self.sync_to_p2p_tx);

let peer = self.sync_manager.retry_batch(request_id, &batch);

if let Some(peer_id) = peer {
Expand Down
3 changes: 2 additions & 1 deletion p2p/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use types::{

use crate::{
misc::{
AttestationSubnetActions, BeaconCommitteeSubscription, RequestId,
AttestationSubnetActions, BeaconCommitteeSubscription, PeerReportReason, RequestId,
SyncCommitteeSubnetAction, SyncCommitteeSubscription,
},
network_api::{NodeIdentity, NodePeer, NodePeerCount, NodePeersQuery},
Expand Down Expand Up @@ -108,6 +108,7 @@ impl SyncToMetrics {

pub enum SyncToP2p {
PruneReceivedBlocks,
ReportPeer(PeerId, PeerAction, ReportSource, PeerReportReason),
RequestBlobsByRange(RequestId, PeerId, Slot, u64),
RequestBlobsByRoot(RequestId, PeerId, Vec<BlobIdentifier>),
RequestBlocksByRange(RequestId, PeerId, Slot, u64),
Expand Down
6 changes: 6 additions & 0 deletions p2p/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeMap;

use serde::{Deserialize, Serialize};
use serde_with::{As, DisplayFromStr};
use strum::IntoStaticStr;
use types::phase0::primitives::{CommitteeIndex, Epoch, Slot, SubnetId, ValidatorIndex};

pub type RequestId = usize;
Expand Down Expand Up @@ -52,6 +53,11 @@ pub struct BeaconCommitteeSubscription {
pub is_aggregator: bool,
}

#[derive(IntoStaticStr)]
pub enum PeerReportReason {
ExpiredSyncBatch,
}

#[derive(PartialEq, Eq, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct SyncCommitteeSubscription {
Expand Down
14 changes: 11 additions & 3 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,17 @@ impl<P: Preset> Network<P> {

message = self.channels.sync_to_p2p_rx.select_next_some() => {
match message {
SyncToP2p::PruneReceivedBlocks => {
self.received_block_roots = HashMap::new();
}
SyncToP2p::ReportPeer(peer_id, peer_action, report_source, reason) => {
self.report_peer(
peer_id,
peer_action,
report_source,
reason
);
}
SyncToP2p::RequestBlobsByRange(request_id, peer_id, start_slot, count) => {
self.request_blobs_by_range(request_id, peer_id, start_slot, count);
}
Expand All @@ -408,9 +419,6 @@ impl<P: Preset> Network<P> {
SyncToP2p::SubscribeToCoreTopics => {
self.subscribe_to_core_topics();
}
SyncToP2p::PruneReceivedBlocks => {
self.received_block_roots = HashMap::new();
}
}
},

Expand Down

0 comments on commit 1148eee

Please sign in to comment.