Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(p2p): validate chain exchange responses for block headers #4454

Merged
merged 13 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 85 additions & 17 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::{
convert::TryFrom,
num::NonZeroU64,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand Down Expand Up @@ -139,10 +140,16 @@ where
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKey,
count: u64,
count: NonZeroU64,
) -> Result<Vec<Arc<Tipset>>, String> {
self.handle_chain_exchange_request(peer_id, tsk, count, HEADERS, |_| true)
.await
self.handle_chain_exchange_request(
peer_id,
tsk,
count,
HEADERS,
|tipsets: &Vec<Arc<Tipset>>| validate_network_tipsets(tipsets, tsk),
)
.await
}
/// Send a `chain_exchange` request for only messages (ignore block
/// headers). If `peer_id` is `None`, requests will be sent to a set of
Expand All @@ -164,7 +171,7 @@ where
self.handle_chain_exchange_request(
peer_id,
tsk,
tipsets.len() as _,
NonZeroU64::new(tipsets.len() as _).expect("Infallible"),
MESSAGES,
|compacted_messages_vec: &Vec<CompactedMessages>| {
for (msg, ts ) in compacted_messages_vec.iter().zip(tipsets.iter().rev()) {
Expand Down Expand Up @@ -195,7 +202,13 @@ where
tsk: &TipsetKey,
) -> Result<FullTipset, String> {
let mut fts = self
.handle_chain_exchange_request(peer_id, tsk, 1, HEADERS | MESSAGES, |_| true)
.handle_chain_exchange_request(
peer_id,
tsk,
NonZeroU64::new(1).expect("Infallible"),
HEADERS | MESSAGES,
|_| true,
)
.await?;

if fts.len() != 1 {
Expand Down Expand Up @@ -254,21 +267,17 @@ where
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKey,
request_len: u64,
request_len: NonZeroU64,
options: u64,
validate: F,
) -> Result<Vec<T>, String>
where
T: TryFrom<TipsetBundle, Error = String> + Send + Sync + 'static,
F: Fn(&Vec<T>) -> bool,
{
if request_len == 0 {
return Ok(vec![]);
}

let request = ChainExchangeRequest {
start: tsk.to_cids(),
request_len,
request_len: request_len.get(),
options,
};

Expand All @@ -289,6 +298,10 @@ where
// No specific peer set, send requests to a shuffled set of top peers until
// a request succeeds.
let peers = self.peer_manager.top_peers_shuffled();
if peers.is_empty() {
return Err("chain exchange failed: no peers are available".into());
}
let n_peers = peers.len();
let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
for peer_id in peers.into_iter() {
let peer_manager = self.peer_manager.clone();
Expand All @@ -308,17 +321,17 @@ where
Ok(chain_exchange_result) => {
match chain_exchange_result.into_result::<T>() {
Ok(r) => Ok(r),
Err(e) => {
Err(error) => {
lookup_failures.fetch_add(1, Ordering::Relaxed);
debug!("Failed chain_exchange response: {e}");
Err(e)
debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange response");
Err(error)
}
}
}
Err(e) => {
Err(error) => {
network_failures.fetch_add(1, Ordering::Relaxed);
debug!("Failed chain_exchange request to peer {peer_id:?}: {e}");
Err(e)
debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange request to peer");
Err(error)
}
}
});
Expand Down Expand Up @@ -462,6 +475,28 @@ where
}
}

/// Validates network tipsets that are sorted by epoch in descending order with the below checks
/// 1. The latest(first) tipset has the desired tipset key
/// 2. The sorted tipsets are chained by their tipset keys
fn validate_network_tipsets(tipsets: &[Arc<Tipset>], start_tipset_key: &TipsetKey) -> bool {
if let Some(start) = tipsets.first() {
if start.key() != start_tipset_key {
tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch");
return false;
}
for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) {
if ts.parents() != pts.key() {
tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain");
return false;
}
}
true
} else {
tracing::warn!("invalid empty chain_exchange_headers response");
false
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -562,4 +597,37 @@ mod tests {
assert_eq!(batch.get_ok().await, None);
assert!(exceeded.load(Ordering::Relaxed));
}

#[test]
#[allow(unused_variables)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What variables are unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

genesis_header, first_header etc.

fn validate_network_tipsets_tests() {
use crate::blocks::{chain4u, Chain4U};

let c4u = Chain4U::new();
chain4u! {
in c4u;
t0 @ [genesis_header]
-> t1 @ [first_header]
-> t2 @ [second_left, second_right]
-> t3 @ [third]
-> t4 @ [fourth]
};
let t0 = Arc::new(t0.clone());
let t1 = Arc::new(t1.clone());
let t2 = Arc::new(t2.clone());
let t3 = Arc::new(t3.clone());
let t4 = Arc::new(t4.clone());
assert!(validate_network_tipsets(
&[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
t4.key()
));
assert!(!validate_network_tipsets(
&[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
t3.key()
));
assert!(!validate_network_tipsets(
&[t4.clone(), t2.clone(), t1.clone(), t0.clone()],
t4.key()
));
}
}
18 changes: 16 additions & 2 deletions src/chain_sync/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
cmp::{min, Ordering},
convert::TryFrom,
future::Future,
num::NonZeroU64,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -845,9 +846,18 @@ async fn sync_headers_in_reverse<DB: Blockstore + Sync + Send + 'static>(
let epoch_diff = oldest_pending_tipset.epoch() - current_head.epoch();
let window = min(epoch_diff, MAX_TIPSETS_TO_REQUEST as i64);
let network_tipsets = network
.chain_exchange_headers(None, oldest_pending_tipset.parents(), window as u64)
.chain_exchange_headers(
None,
oldest_pending_tipset.parents(),
NonZeroU64::new(window as _).expect("Infallible"),
)
.await
.map_err(TipsetRangeSyncerError::NetworkTipsetQueryFailed)?;
if network_tipsets.is_empty() {
return Err(TipsetRangeSyncerError::NetworkTipsetQueryFailed(
"0 network tipsets have been fetched".into(),
));
}

for tipset in network_tipsets
.into_iter()
Expand All @@ -872,7 +882,11 @@ async fn sync_headers_in_reverse<DB: Blockstore + Sync + Send + 'static>(
info!("Fork detected, searching for a common ancestor between the local chain and the network chain");
const FORK_LENGTH_THRESHOLD: u64 = 500;
let fork_tipsets = network
.chain_exchange_headers(None, oldest_pending_tipset.parents(), FORK_LENGTH_THRESHOLD)
.chain_exchange_headers(
None,
oldest_pending_tipset.parents(),
NonZeroU64::new(FORK_LENGTH_THRESHOLD).expect("Infallible"),
)
.await
.map_err(TipsetRangeSyncerError::NetworkTipsetQueryFailed)?;
let mut potential_common_ancestor = chain_store
Expand Down
51 changes: 35 additions & 16 deletions src/libp2p/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,30 +124,49 @@ impl PeerManager {
pub(in crate::libp2p) fn sorted_peers(&self) -> Vec<PeerId> {
let peer_lk = self.peers.read();
let average_time = self.avg_global_time.read();
let mut n_stateful = 0;
let mut peers: Vec<_> = peer_lk
.full_peers
.iter()
.filter_map(|(p, info)| {
// Filter out nodes that are stateless (or far behind)
if info.head.epoch() > 0 {
let cost = if info.successes > 0 {
// Calculate cost based on fail rate and latency
let fail_rate = f64::from(info.failures) / f64::from(info.successes);
info.average_time.as_secs_f64() + fail_rate * average_time.as_secs_f64()
} else {
// There have been no failures or successes
average_time.as_secs_f64() * NEW_PEER_MUL
};
Some((p, cost))
} else {
None
.map(|(&p, info)| {
let is_stateful = info.head.epoch() > 0;
if is_stateful {
n_stateful += 1;
}
let cost = if info.successes + info.failures > 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wibble: this was changed to if info.successes > 0 to address #4376 (comment)
However, this unexpectedly changes the logic to favor peers that never succeed.
The original code does not panic but returns inf

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=657686952703dcee856841d192b99774

fn main() {
    let x = 100.0 + f64::from(1) / f64::from(0);
    println!("{x}");
}

// Output: inf

// Calculate cost based on fail rate and latency
// Note that when `success` is zero, the result is `inf`
let fail_rate = f64::from(info.failures) / f64::from(info.successes);
info.average_time.as_secs_f64() + fail_rate * average_time.as_secs_f64()
} else {
// There have been no failures or successes
average_time.as_secs_f64() * NEW_PEER_MUL
};
(p, is_stateful, cost)
})
.collect();

// Unstable sort because hashmap iter order doesn't need to be preserved.
peers.sort_unstable_by(|(_, v1), (_, v2)| v1.partial_cmp(v2).unwrap_or(Ordering::Equal));
peers.into_iter().map(|(&peer, _)| peer).collect()
peers.sort_unstable_by(|(_, _, v1), (_, _, v2)| {
v1.partial_cmp(v2).unwrap_or(Ordering::Equal)
});
// Filter out nodes that are stateless when `n_stateful > 0`
if n_stateful > 0 {
peers
.into_iter()
.filter_map(
|(peer, is_stateful, _)| {
if is_stateful {
Some(peer)
} else {
None
}
},
)
.collect()
} else {
peers.into_iter().map(|(peer, _, _)| peer).collect()
}
}

/// Return shuffled slice of ordered peers from the peer manager. Ordering
Expand Down
Loading