From 21cba11e5e2dc155f085444559b0e1989bb207ec Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 26 Jun 2024 14:21:40 +0800 Subject: [PATCH 1/6] fix(p2p): validate chain exchange responses for block headers --- src/chain_sync/network_context.rs | 42 +++++++++++++++++++++++++------ src/chain_sync/tipset_syncer.rs | 5 ++++ 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/chain_sync/network_context.rs b/src/chain_sync/network_context.rs index e3864995402e..5b2bce43d6fe 100644 --- a/src/chain_sync/network_context.rs +++ b/src/chain_sync/network_context.rs @@ -141,8 +141,30 @@ where tsk: &TipsetKey, count: u64, ) -> Result>, String> { - self.handle_chain_exchange_request(peer_id, tsk, count, HEADERS, |_| true) - .await + self.handle_chain_exchange_request( + peer_id, + tsk, + count, + HEADERS, + |tipsets: &Vec>| { + if let Some(start) = tipsets.first() { + if start.key() != tsk { + tracing::warn!(epoch=%start.epoch(), expected=%tsk, actual=%start.key(), "start tipset key mismatch"); + return false; + } + for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) { + if ts.parents() != pts.key() { + tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain"); + return false; + } + } + true + } else { + false + } + }, + ) + .await } /// Send a `chain_exchange` request for only messages (ignore block /// headers). If `peer_id` is `None`, requests will be sent to a set of @@ -289,6 +311,10 @@ where // No specific peer set, send requests to a shuffled set of top peers until // a request succeeds. let peers = self.peer_manager.top_peers_shuffled(); + if peers.is_empty() { + return Err("chain exchange failed: no peers are available".into()); + } + let n_peers = peers.len(); let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS); for peer_id in peers.into_iter() { let peer_manager = self.peer_manager.clone(); @@ -308,17 +334,17 @@ where Ok(chain_exchange_result) => { match chain_exchange_result.into_result::() { Ok(r) => Ok(r), - Err(e) => { + Err(error) => { lookup_failures.fetch_add(1, Ordering::Relaxed); - debug!("Failed chain_exchange response: {e}"); - Err(e) + debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange response"); + Err(error) } } } - Err(e) => { + Err(error) => { network_failures.fetch_add(1, Ordering::Relaxed); - debug!("Failed chain_exchange request to peer {peer_id:?}: {e}"); - Err(e) + debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange request to peer"); + Err(error) } } }); diff --git a/src/chain_sync/tipset_syncer.rs b/src/chain_sync/tipset_syncer.rs index 3e144542c1c4..4a2ea23a854a 100644 --- a/src/chain_sync/tipset_syncer.rs +++ b/src/chain_sync/tipset_syncer.rs @@ -848,6 +848,11 @@ async fn sync_headers_in_reverse( .chain_exchange_headers(None, oldest_pending_tipset.parents(), window as u64) .await .map_err(TipsetRangeSyncerError::NetworkTipsetQueryFailed)?; + if network_tipsets.is_empty() { + return Err(TipsetRangeSyncerError::NetworkTipsetQueryFailed( + "0 network tipsets have been fetched".into(), + )); + } for tipset in network_tipsets .into_iter() From baa83c7b5fe51f6fe3c3213ed7364d804a58e24c Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 26 Jun 2024 14:41:11 +0800 Subject: [PATCH 2/6] NonZeroU64 --- src/chain_sync/network_context.rs | 22 +++++++++++++--------- src/chain_sync/tipset_syncer.rs | 13 +++++++++++-- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/chain_sync/network_context.rs b/src/chain_sync/network_context.rs index 5b2bce43d6fe..1c35a2cb6468 100644 --- a/src/chain_sync/network_context.rs +++ b/src/chain_sync/network_context.rs @@ -3,6 +3,7 @@ use std::{ convert::TryFrom, + num::NonZeroU64, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -139,7 +140,7 @@ where &self, peer_id: Option, tsk: &TipsetKey, - count: u64, + count: NonZeroU64, ) -> Result>, String> { self.handle_chain_exchange_request( peer_id, @@ -160,6 +161,7 @@ where } true } else { + tracing::warn!(%count, "invalid empty chain_exchange_headers response"); false } }, @@ -186,7 +188,7 @@ where self.handle_chain_exchange_request( peer_id, tsk, - tipsets.len() as _, + NonZeroU64::new(tipsets.len() as _).expect("Infallible"), MESSAGES, |compacted_messages_vec: &Vec| { for (msg, ts ) in compacted_messages_vec.iter().zip(tipsets.iter().rev()) { @@ -217,7 +219,13 @@ where tsk: &TipsetKey, ) -> Result { let mut fts = self - .handle_chain_exchange_request(peer_id, tsk, 1, HEADERS | MESSAGES, |_| true) + .handle_chain_exchange_request( + peer_id, + tsk, + NonZeroU64::new(1).expect("Infallible"), + HEADERS | MESSAGES, + |_| true, + ) .await?; if fts.len() != 1 { @@ -276,7 +284,7 @@ where &self, peer_id: Option, tsk: &TipsetKey, - request_len: u64, + request_len: NonZeroU64, options: u64, validate: F, ) -> Result, String> @@ -284,13 +292,9 @@ where T: TryFrom + Send + Sync + 'static, F: Fn(&Vec) -> bool, { - if request_len == 0 { - return Ok(vec![]); - } - let request = ChainExchangeRequest { start: tsk.to_cids(), - request_len, + request_len: request_len.get(), options, }; diff --git a/src/chain_sync/tipset_syncer.rs b/src/chain_sync/tipset_syncer.rs index 4a2ea23a854a..56be83b48810 100644 --- a/src/chain_sync/tipset_syncer.rs +++ b/src/chain_sync/tipset_syncer.rs @@ -5,6 +5,7 @@ use std::{ cmp::{min, Ordering}, convert::TryFrom, future::Future, + num::NonZeroU64, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -845,7 +846,11 @@ async fn sync_headers_in_reverse( let epoch_diff = oldest_pending_tipset.epoch() - current_head.epoch(); let window = min(epoch_diff, MAX_TIPSETS_TO_REQUEST as i64); let network_tipsets = network - .chain_exchange_headers(None, oldest_pending_tipset.parents(), window as u64) + .chain_exchange_headers( + None, + oldest_pending_tipset.parents(), + NonZeroU64::new(window as _).expect("Infallible"), + ) .await .map_err(TipsetRangeSyncerError::NetworkTipsetQueryFailed)?; if network_tipsets.is_empty() { @@ -877,7 +882,11 @@ async fn sync_headers_in_reverse( info!("Fork detected, searching for a common ancestor between the local chain and the network chain"); const FORK_LENGTH_THRESHOLD: u64 = 500; let fork_tipsets = network - .chain_exchange_headers(None, oldest_pending_tipset.parents(), FORK_LENGTH_THRESHOLD) + .chain_exchange_headers( + None, + oldest_pending_tipset.parents(), + NonZeroU64::new(FORK_LENGTH_THRESHOLD).expect("Infallible"), + ) .await .map_err(TipsetRangeSyncerError::NetworkTipsetQueryFailed)?; let mut potential_common_ancestor = chain_store From 4c05938d91948ab211d48b417389930febcd7686 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 26 Jun 2024 22:11:15 +0800 Subject: [PATCH 3/6] named fn and doc --- src/chain_sync/network_context.rs | 41 +++++++++++++++++-------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/chain_sync/network_context.rs b/src/chain_sync/network_context.rs index 1c35a2cb6468..ff9d7993ddd8 100644 --- a/src/chain_sync/network_context.rs +++ b/src/chain_sync/network_context.rs @@ -147,24 +147,7 @@ where tsk, count, HEADERS, - |tipsets: &Vec>| { - if let Some(start) = tipsets.first() { - if start.key() != tsk { - tracing::warn!(epoch=%start.epoch(), expected=%tsk, actual=%start.key(), "start tipset key mismatch"); - return false; - } - for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) { - if ts.parents() != pts.key() { - tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain"); - return false; - } - } - true - } else { - tracing::warn!(%count, "invalid empty chain_exchange_headers response"); - false - } - }, + |tipsets: &Vec>| validate_network_tipsets(tipsets, tsk), ) .await } @@ -492,6 +475,28 @@ where } } +/// Validates network tipsets that are sorted by epoch in descending order with the below checks +/// 1. The latest(first) tipset has the desired tipset key +/// 2. The sorted tipsets are chained by their tipset keys +fn validate_network_tipsets(tipsets: &[Arc], start_tipset_key: &TipsetKey) -> bool { + if let Some(start) = tipsets.first() { + if start.key() != start_tipset_key { + tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch"); + return false; + } + for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) { + if ts.parents() != pts.key() { + tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain"); + return false; + } + } + true + } else { + tracing::warn!("invalid empty chain_exchange_headers response"); + false + } +} + #[cfg(test)] mod tests { use super::*; From 77380cffd5655e42a7a6e25c221171641e7df5e0 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 27 Jun 2024 14:02:03 +0800 Subject: [PATCH 4/6] unit test --- src/chain_sync/network_context.rs | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/chain_sync/network_context.rs b/src/chain_sync/network_context.rs index ff9d7993ddd8..e9d69ba28e7f 100644 --- a/src/chain_sync/network_context.rs +++ b/src/chain_sync/network_context.rs @@ -597,4 +597,37 @@ mod tests { assert_eq!(batch.get_ok().await, None); assert!(exceeded.load(Ordering::Relaxed)); } + + #[test] + #[allow(unused_variables)] + fn validate_network_tipsets_tests() { + use crate::blocks::{chain4u, Chain4U}; + + let c4u = Chain4U::new(); + chain4u! { + in c4u; + t0 @ [genesis_header] + -> t1 @ [first_header] + -> t2 @ [second_left, second_right] + -> t3 @ [third] + -> t4 @ [fourth] + }; + let t0 = Arc::new(t0.clone()); + let t1 = Arc::new(t1.clone()); + let t2 = Arc::new(t2.clone()); + let t3 = Arc::new(t3.clone()); + let t4 = Arc::new(t4.clone()); + assert!(validate_network_tipsets( + &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()], + t4.key() + )); + assert!(!validate_network_tipsets( + &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()], + t3.key() + )); + assert!(!validate_network_tipsets( + &[t4.clone(), t2.clone(), t1.clone(), t0.clone()], + t4.key() + )); + } } From 0acd8e25d1aacfcf63bbe61c7a5cba410178816a Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 27 Jun 2024 14:54:01 +0800 Subject: [PATCH 5/6] refine stateless peer filtering logic --- src/libp2p/peer_manager.rs | 51 ++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/src/libp2p/peer_manager.rs b/src/libp2p/peer_manager.rs index c8dcb90ae805..1cadd2852e28 100644 --- a/src/libp2p/peer_manager.rs +++ b/src/libp2p/peer_manager.rs @@ -124,30 +124,49 @@ impl PeerManager { pub(in crate::libp2p) fn sorted_peers(&self) -> Vec { let peer_lk = self.peers.read(); let average_time = self.avg_global_time.read(); + let mut n_stateful = 0; let mut peers: Vec<_> = peer_lk .full_peers .iter() - .filter_map(|(p, info)| { - // Filter out nodes that are stateless (or far behind) - if info.head.epoch() > 0 { - let cost = if info.successes > 0 { - // Calculate cost based on fail rate and latency - let fail_rate = f64::from(info.failures) / f64::from(info.successes); - info.average_time.as_secs_f64() + fail_rate * average_time.as_secs_f64() - } else { - // There have been no failures or successes - average_time.as_secs_f64() * NEW_PEER_MUL - }; - Some((p, cost)) - } else { - None + .map(|(&p, info)| { + let is_stateful = info.head.epoch() > 0; + if is_stateful { + n_stateful += 1; } + let cost = if info.successes + info.failures > 0 { + // Calculate cost based on fail rate and latency + // Note that when `success` is zero, the result is `inf` + let fail_rate = f64::from(info.failures) / f64::from(info.successes); + info.average_time.as_secs_f64() + fail_rate * average_time.as_secs_f64() + } else { + // There have been no failures or successes + average_time.as_secs_f64() * NEW_PEER_MUL + }; + (p, is_stateful, cost) }) .collect(); // Unstable sort because hashmap iter order doesn't need to be preserved. - peers.sort_unstable_by(|(_, v1), (_, v2)| v1.partial_cmp(v2).unwrap_or(Ordering::Equal)); - peers.into_iter().map(|(&peer, _)| peer).collect() + peers.sort_unstable_by(|(_, _, v1), (_, _, v2)| { + v1.partial_cmp(v2).unwrap_or(Ordering::Equal) + }); + // Filter out nodes that are stateless when `n_stateful > 0` + if n_stateful > 0 { + peers + .into_iter() + .filter_map( + |(peer, is_stateful, _)| { + if is_stateful { + Some(peer) + } else { + None + } + }, + ) + .collect() + } else { + peers.into_iter().map(|(peer, _, _)| peer).collect() + } } /// Return shuffled slice of ordered peers from the peer manager. Ordering From 71c190e2e714fb7dc1fc621bc30cb512797998f2 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 27 Jun 2024 16:27:55 +0800 Subject: [PATCH 6/6] fix stateless node CI test --- src/chain_sync/chain_muxer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/chain_sync/chain_muxer.rs b/src/chain_sync/chain_muxer.rs index 19544ea67c3d..16f73ecf4517 100644 --- a/src/chain_sync/chain_muxer.rs +++ b/src/chain_sync/chain_muxer.rs @@ -481,6 +481,11 @@ where } }; + // Update the peer head + network + .peer_manager() + .update_peer_head(source, Arc::new(tipset.clone().into_tipset())); + if tipset.epoch() + (SECONDS_IN_DAY / block_delay as i64) < chain_store.heaviest_tipset().epoch() { @@ -511,11 +516,6 @@ where block.persist(&chain_store.db)?; } - // Update the peer head - network - .peer_manager() - .update_peer_head(source, Arc::new(tipset.clone().into_tipset())); - Ok(Some((tipset, source))) }