diff --git a/Cargo.lock b/Cargo.lock index 2cdf75c0..07cb06da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2534,7 +2534,9 @@ dependencies = [ "backoff", "ceramic-core", "ceramic-event-svc", + "ceramic-interest-svc", "ceramic-metrics", + "ceramic-peer-svc", "chrono", "cid 0.11.1", "criterion2", diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index b9f5a80a..5cfc0fa0 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -38,7 +38,6 @@ ssh-key = { workspace = true, features = ["ed25519", "std", "rand_core"] } tempfile.workspace = true tokio = { workspace = true, features = ["fs", "time", "sync", "macros"] } tokio-stream.workspace = true -tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing.workspace = true void.workspace = true zeroize.workspace = true @@ -71,9 +70,13 @@ features = [ [dev-dependencies] ceramic-event-svc.workspace = true +ceramic-interest-svc.workspace = true +ceramic-peer-svc.workspace = true +recon.workspace = true criterion2.workspace = true rand_chacha.workspace = true test-log.workspace = true +tracing-subscriber.workspace = true [[bench]] name = "lru_cache" diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index baa85d9e..cee741d0 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -24,8 +24,8 @@ use tracing::{info, warn}; use self::ceramic_peer_manager::CeramicPeerManager; pub use self::event::Event; -use crate::config::Libp2pConfig; use crate::Metrics; +use crate::{config::Libp2pConfig, peers}; mod ceramic_peer_manager; mod event; @@ -72,6 +72,7 @@ where relay_client: Option, recons: Option<(P, I, M)>, block_store: Arc, + peers_tx: tokio::sync::mpsc::Sender, metrics: Metrics, ) -> Result { let pub_key = local_key.public(); @@ -198,7 +199,7 @@ where relay, dcutr: dcutr.into(), relay_client: relay_client.into(), - peer_manager: CeramicPeerManager::new(&config.ceramic_peers, metrics)?, + peer_manager: CeramicPeerManager::new(peers_tx, &config.ceramic_peers, metrics)?, limits, recon: recon.into(), }) diff --git a/p2p/src/behaviour/ceramic_peer_manager.rs b/p2p/src/behaviour/ceramic_peer_manager.rs index f9e9aacb..715105c2 100644 --- a/p2p/src/behaviour/ceramic_peer_manager.rs +++ b/p2p/src/behaviour/ceramic_peer_manager.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, fmt::{self, Debug, Formatter}, future, task::{Context, Poll}, @@ -8,23 +9,32 @@ use std::{ use ahash::AHashMap; use anyhow::{anyhow, Result}; use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder}; +use ceramic_core::PeerEntry; #[allow(deprecated)] use ceramic_metrics::Recorder; use futures_util::{future::BoxFuture, FutureExt}; -use libp2p::swarm::{ - dial_opts::{DialOpts, PeerCondition}, - ToSwarm, -}; use libp2p::{ identify::Info as IdentifyInfo, - multiaddr::Protocol, swarm::{dummy, ConnectionId, DialError, NetworkBehaviour}, Multiaddr, PeerId, }; -use tokio::time; +use libp2p::{ + multiaddr::Protocol, + swarm::{ + dial_opts::{DialOpts, PeerCondition}, + ToSwarm, + }, +}; +use tokio::{ + sync::{mpsc::Sender, oneshot}, + time, +}; use tracing::{info, warn}; -use crate::metrics::{self, Metrics}; +use crate::{ + metrics::{self, Metrics}, + peers, +}; /// Manages state for Ceramic peers. /// Ceramic peers are peers that participate in the Ceramic network. @@ -35,6 +45,10 @@ pub struct CeramicPeerManager { metrics: Metrics, info: AHashMap, ceramic_peers: AHashMap, + // Use a message passing technique to get peers so that we do not use the current task to do + // DB/IO work. + peers_tx: Sender, + peers_fut: Option>>>, } #[derive(Default, Debug, Clone)] @@ -60,14 +74,18 @@ const PEERING_DIAL_JITTER: f64 = 0.1; pub enum PeerManagerEvent {} impl CeramicPeerManager { - pub fn new(ceramic_peers: &[Multiaddr], metrics: Metrics) -> Result { + pub fn new( + peers_tx: Sender, + ceramic_peers: &[Multiaddr], + metrics: Metrics, + ) -> Result { let ceramic_peers = ceramic_peers .iter() // Extract peer id from multiaddr .map(|multiaddr| { if let Some(peer) = multiaddr.iter().find_map(|proto| match proto { Protocol::P2p(peer_id) => { - Some((peer_id, CeramicPeer::new(multiaddr.to_owned()))) + Some((peer_id, CeramicPeer::new(vec![multiaddr.to_owned()]))) } _ => None, }) { @@ -81,6 +99,8 @@ impl CeramicPeerManager { metrics, info: Default::default(), ceramic_peers, + peers_tx, + peers_fut: None, }) } @@ -99,11 +119,30 @@ impl CeramicPeerManager { pub fn is_ceramic_peer(&self, peer_id: &PeerId) -> bool { self.ceramic_peers.contains_key(peer_id) } + pub fn new_peers(&mut self) { + if self.peers_fut.is_none() { + let (tx, rx) = oneshot::channel(); + // Construct future that will resolve to the set of all known remote peers + let peers_tx = self.peers_tx.clone(); + self.peers_fut = Some( + async move { + futures::future::join(peers_tx.send(peers::Message::AllRemotePeers(tx)), rx) + .map(|(send, peers)| { + send.map_err(anyhow::Error::from) + .and(peers.map_err(anyhow::Error::from).and_then(|inner| inner)) + }) + .await + } + .boxed(), + ) + } // else do nothing because we will get all peers anyways + } fn handle_connection_established(&mut self, peer_id: &PeerId) { if let Some(peer) = self.ceramic_peers.get_mut(peer_id) { info!( - multiaddr = %peer.multiaddr, + %peer_id, + multiaddr = ?peer.addrs, "connection established, stop dialing ceramic peer", ); peer.stop_redial(); @@ -114,7 +153,8 @@ impl CeramicPeerManager { fn handle_connection_closed(&mut self, peer_id: &PeerId) { if let Some(peer) = self.ceramic_peers.get_mut(peer_id) { warn!( - multiaddr = %peer.multiaddr, + %peer_id, + multiaddr = ?peer.addrs, "connection closed, redial ceramic peer", ); peer.start_redial(); @@ -125,7 +165,8 @@ impl CeramicPeerManager { fn handle_dial_failure(&mut self, peer_id: &PeerId) { if let Some(peer) = self.ceramic_peers.get_mut(peer_id) { warn!( - multiaddr = %peer.multiaddr, + %peer_id, + multiaddr = ?peer.addrs, "dial failed, redial ceramic peer" ); peer.backoff_redial(); @@ -195,13 +236,37 @@ impl NetworkBehaviour for CeramicPeerManager { &mut self, cx: &mut Context<'_>, ) -> Poll>> { + if let Some(mut peers) = self.peers_fut.take() { + match peers.poll_unpin(cx) { + Poll::Ready(peers) => match peers { + Ok(peers) => { + for peer_entry in peers { + self.ceramic_peers + .entry(peer_entry.id().peer_id()) + .and_modify(|peer| { + let count = peer.addrs.len(); + peer.addrs.extend(peer_entry.addresses().iter().cloned()); + if count != peer.addrs.len() { + peer.start_redial() + } + }) + .or_insert(CeramicPeer::new(peer_entry.addresses().to_vec())); + } + } + Err(err) => warn!(%err,"failed to get set of remote peers"), + }, + Poll::Pending => { + self.peers_fut.replace(peers); + } + } + } for (peer_id, peer) in self.ceramic_peers.iter_mut() { if let Some(mut dial_future) = peer.dial_future.take() { match dial_future.as_mut().poll_unpin(cx) { Poll::Ready(()) => { return Poll::Ready(ToSwarm::Dial { opts: DialOpts::peer_id(*peer_id) - .addresses(vec![peer.multiaddr.clone()]) + .addresses(peer.addrs.iter().cloned().collect()) .condition(PeerCondition::Disconnected) .build(), }) @@ -239,7 +304,7 @@ impl NetworkBehaviour for CeramicPeerManager { // State of Ceramic peer. struct CeramicPeer { - multiaddr: Multiaddr, + addrs: HashSet, dial_backoff: ExponentialBackoff, dial_future: Option>, } @@ -247,7 +312,7 @@ struct CeramicPeer { impl Debug for CeramicPeer { fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { f.debug_struct("BootstrapPeer") - .field("multiaddr", &self.multiaddr) + .field("multiaddr", &self.addrs) .field("dial_backoff", &self.dial_backoff) .field("dial_future", &self.dial_future.is_some()) .finish() @@ -255,7 +320,7 @@ impl Debug for CeramicPeer { } impl CeramicPeer { - fn new(multiaddr: Multiaddr) -> Self { + fn new(addrs: Vec) -> Self { let dial_backoff = ExponentialBackoffBuilder::new() .with_initial_interval(PEERING_MIN_DIAL_SECS) .with_multiplier(PEERING_DIAL_BACKOFF) @@ -266,7 +331,7 @@ impl CeramicPeer { // Expire initial future so that we dial peers immediately let dial_future = Some(future::ready(()).boxed()); Self { - multiaddr, + addrs: addrs.into_iter().collect(), dial_backoff, dial_future, } diff --git a/p2p/src/node.rs b/p2p/src/node.rs index d15751e7..fdccee70 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -46,7 +46,10 @@ use crate::{ swarm::build_swarm, Config, }; -use recon::{libp2p::Recon, Sha256a}; +use recon::{ + libp2p::{PeerEvent, PeerStatus, Recon, StreamSet}, + Sha256a, +}; #[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] @@ -166,15 +169,26 @@ where .. } = config; + // Setup peers message channel + let (peers_tx, peers_rx) = channel(1_000); + let mut swarm = build_swarm( &libp2p_config, node_key.p2p_keypair(), recons, block_store, + peers_tx.clone(), metrics.clone(), ) .await?; + if !libp2p_config.external_multiaddrs.is_empty() { + peers_tx + .send(peers::Message::NewLocalAddresses( + libp2p_config.external_multiaddrs.clone(), + )) + .await?; + } for addr in &libp2p_config.external_multiaddrs { swarm.add_external_address(addr.clone()); } @@ -201,9 +215,11 @@ where .unwrap() }); - let (peers_tx, peers_rx) = channel(1_000); + // Spawn the peers task which manages periodically publishing self into the peers ring as + // well as answering queries about known peers in the db. let peers_task = tokio::task::spawn(async move { peers::run( + // Expire the peer entry in 24 hours. Duration::from_secs(24 * 60 * 60), node_key, peer_svc, @@ -530,12 +546,12 @@ where SwarmEvent::ExternalAddrConfirmed { address } => { if let Err(err) = self .peers_tx - .send(peers::Message::NewLocalAddress(address)) + .send(peers::Message::NewLocalAddresses(vec![address])) .await { warn!( address = ?err.0, - "failed to notifiy peers task about a new local address" + "failed to notifiy peers task about a new external address" ); } Ok(None) @@ -548,7 +564,20 @@ where { warn!( address = ?err.0, - "failed to notifiy peers task about an expired local address" + "failed to notifiy peers task about an expired external address" + ); + } + Ok(None) + } + SwarmEvent::NewListenAddr { address, .. } => { + if let Err(err) = self + .peers_tx + .send(peers::Message::NewLocalAddresses(vec![address])) + .await + { + warn!( + address = ?err.0, + "failed to notifiy peers task about a new listen address" ); } Ok(None) @@ -910,6 +939,19 @@ where } Ok(None) } + Event::Recon(recon::libp2p::Event::PeerEvent(PeerEvent { + status: + PeerStatus::Synchronized { + stream_set: StreamSet::Peer, + new_count, + }, + .. + })) => { + if new_count > 0 { + self.swarm.behaviour_mut().peer_manager.new_peers(); + } + Ok(None) + } _ => { // TODO: check all important events are handled Ok(None) @@ -1197,598 +1239,3 @@ enum SwarmEventResult { KademliaAddressAdded, KademliaBoostrapSuccess, } - -#[cfg(test)] -mod tests { - use std::marker::PhantomData; - - use async_trait::async_trait; - use ceramic_core::{NodeId, RangeOpen}; - use ceramic_event_svc::{store::SqlitePool, EventService}; - use futures::TryStreamExt; - use rand::prelude::*; - use rand_chacha::ChaCha8Rng; - use recon::{InsertResult, RangeHash, ReconItem, Result as ReconResult, Sha256a, SyncState}; - - use libp2p::kad::RecordKey; - - use super::*; - use anyhow::Result; - use iroh_rpc_client::P2pClient; - use iroh_rpc_types::{p2p::P2pAddr, Addr}; - use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - - #[tokio::test] - #[ignore] - async fn test_fetch_providers_grpc_dht() -> Result<()> { - let server_addr = "irpc://0.0.0.0:4401".parse().unwrap(); - let client_addr = "irpc://0.0.0.0:4401".parse().unwrap(); - fetch_providers( - "/ip4/0.0.0.0/tcp/5001".parse().unwrap(), - server_addr, - client_addr, - ) - .await - .unwrap(); - Ok(()) - } - - #[tokio::test] - #[ignore] - async fn test_fetch_providers_mem_dht() -> Result<()> { - tracing_subscriber::registry() - .with(fmt::layer().pretty()) - .with(EnvFilter::from_default_env()) - .init(); - - let client_addr = Addr::new_mem(); - let server_addr = client_addr.clone(); - fetch_providers( - "/ip4/0.0.0.0/tcp/5003".parse().unwrap(), - server_addr, - client_addr, - ) - .await?; - Ok(()) - } - - #[derive(Debug)] - struct TestRunnerBuilder { - /// An Optional listening address for this node - /// When `None`, the swarm will connect to a random tcp port. - addrs: Option>, - /// The listening addresses for the p2p client. - /// When `None`, the client will communicate over a memory rpc channel - rpc_addrs: Option<(P2pAddr, P2pAddr)>, - /// When `true`, allow bootstrapping to the network. - /// Otherwise, don't provide any addresses from which to bootstrap. - bootstrap: bool, - /// An optional seed to use when building a peer_id. - seed: Option, - /// Optional `Keys` the node should provide to the DHT on start up. - keys: Option>, - /// Pass through to node.trust_observed_addrs - trust_observed_addrs: bool, - } - - #[derive(Clone)] - struct DummyRecon(PhantomData); - - #[async_trait] - impl Recon for DummyRecon - where - K: recon::Key - + std::fmt::Debug - + serde::Serialize - + for<'de> serde::Deserialize<'de> - + Send - + Sync - + 'static, - { - type Key = K; - type Hash = Sha256a; - - async fn insert( - &self, - _items: Vec>, - _informant: NodeId, - ) -> ReconResult> { - unreachable!() - } - - async fn range(&self, _range: std::ops::Range<&Self::Key>) -> ReconResult> { - unreachable!() - } - - async fn len(&self) -> ReconResult { - unreachable!() - } - - async fn value_for_key(&self, _key: Self::Key) -> ReconResult>> { - Ok(None) - } - async fn interests(&self) -> ReconResult>> { - unreachable!() - } - - async fn process_interests( - &self, - _interests: Vec>, - ) -> ReconResult>> { - unreachable!() - } - - async fn initial_range( - &self, - _interest: RangeOpen, - ) -> ReconResult> { - unreachable!() - } - - async fn process_range( - &self, - _range: RangeHash, - ) -> ReconResult> { - unreachable!() - } - - fn metrics(&self) -> recon::Metrics { - unreachable!() - } - } - #[derive(Clone)] - struct DummyPeers; - - #[async_trait] - impl PeerService for DummyPeers { - async fn insert(&self, _peer: &PeerKey) -> anyhow::Result<()> { - unreachable!() - } - async fn delete_range(&self, _range: std::ops::Range<&PeerKey>) -> anyhow::Result<()> { - unreachable!() - } - async fn all_peers(&self) -> anyhow::Result> { - unreachable!() - } - } - - impl TestRunnerBuilder { - fn new() -> Self { - Self { - addrs: None, - rpc_addrs: None, - bootstrap: true, - seed: None, - keys: None, - trust_observed_addrs: false, - } - } - - fn with_addrs(mut self, addrs: Vec) -> Self { - self.addrs = Some(addrs); - self - } - - fn with_rpc_addrs(mut self, rpc_server_addr: P2pAddr, rpc_client_addr: P2pAddr) -> Self { - self.rpc_addrs = Some((rpc_server_addr, rpc_client_addr)); - self - } - - fn no_bootstrap(mut self) -> Self { - self.bootstrap = false; - self - } - - fn with_seed(mut self, seed: ChaCha8Rng) -> Self { - self.seed = Some(seed); - self - } - fn with_trust_observed_addrs(mut self, trust_observed_addrs: bool) -> Self { - self.trust_observed_addrs = trust_observed_addrs; - self - } - - async fn build(self) -> Result { - let (rpc_server_addr, rpc_client_addr) = match self.rpc_addrs { - Some((rpc_server_addr, rpc_client_addr)) => (rpc_server_addr, rpc_client_addr), - None => { - let x = Addr::new_mem(); - (x.clone(), x) - } - }; - let mut network_config = Config::default_with_rpc(rpc_client_addr.clone()); - network_config.libp2p.trust_observed_addrs = self.trust_observed_addrs; - - if let Some(addrs) = self.addrs { - network_config.libp2p.listening_multiaddrs = addrs; - } else { - network_config.libp2p.listening_multiaddrs = - vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()]; - } - - if !self.bootstrap { - network_config.libp2p.ceramic_peers = vec![]; - } - let node_key = NodeKey::random(); - let peer_id = node_key.id().peer_id(); - - // Using an in memory DB for the tests for realistic benchmark disk DB is needed. - let sql_pool = SqlitePool::connect_in_memory().await.unwrap(); - - let metrics = Metrics::register(&mut prometheus_client::registry::Registry::default()); - let store = Arc::new(EventService::try_new(sql_pool, true, true, vec![]).await?); - let mut p2p = Node::new( - network_config, - rpc_server_addr, - node_key, - DummyPeers, - None::<( - DummyRecon, - DummyRecon, - DummyRecon, - )>, - store, - metrics, - ) - .await?; - let cfg = iroh_rpc_client::Config { - p2p_addr: Some(rpc_client_addr), - channels: Some(1), - ..Default::default() - }; - - if let Some(keys) = self.keys { - if let Some(kad) = p2p.swarm.behaviour_mut().kad.as_mut() { - for k in keys { - kad.start_providing(k)?; - } - } else { - anyhow::bail!("expected kad behaviour to exist"); - } - } - - let client = RpcClient::new(cfg).await?; - - let network_events = p2p.network_events(); - let task = tokio::task::spawn(async move { p2p.run().await.unwrap() }); - - let client = client.try_p2p()?; - - let addr = - tokio::time::timeout(Duration::from_millis(500), get_addr_loop(client.clone())) - .await - .context("timed out before getting a listening address for the node")??; - let mut dial_addr = addr.clone(); - dial_addr.push(Protocol::P2p(peer_id)); - Ok(TestRunner { - task, - client, - peer_id, - network_events, - addr, - dial_addr, - }) - } - } - - async fn get_addr_loop(client: P2pClient) -> Result { - loop { - let l = client.listeners().await?; - if let Some(a) = l.first() { - return Ok(a.clone()); - } - } - } - - struct TestRunner { - /// The task that runs the p2p node. - task: JoinHandle<()>, - /// The RPC client - /// Used to communicate with the p2p node. - client: P2pClient, - /// The node's peer_id - peer_id: PeerId, - /// A channel to read the network events received by the node. - network_events: Receiver, - /// The listening address for this node. - addr: Multiaddr, - /// A multiaddr that is a combination of the listening addr and peer_id. - /// This address can be used by another node to dial this peer directly. - dial_addr: Multiaddr, - } - - impl Drop for TestRunner { - fn drop(&mut self) { - self.task.abort(); - } - } - - async fn fetch_providers( - addr: Multiaddr, - rpc_server_addr: P2pAddr, - rpc_client_addr: P2pAddr, - ) -> Result<()> { - let test_runner = TestRunnerBuilder::new() - .with_addrs(vec![addr]) - .with_rpc_addrs(rpc_server_addr, rpc_client_addr) - .build() - .await?; - - { - // Make sure we are bootstrapped. - tokio::time::sleep(Duration::from_millis(2500)).await; - let c = "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" // cspell:disable-line - .parse() - .unwrap(); - - let mut providers = Vec::new(); - let mut chan = test_runner.client.fetch_providers_dht(&c).await?; - while let Some(new_providers) = chan.next().await { - let new_providers = new_providers.unwrap(); - println!("providers found: {}", new_providers.len()); - assert!(!new_providers.is_empty()); - - for p in &new_providers { - println!("{p}"); - providers.push(*p); - } - } - - println!("{providers:?}"); - assert!(!providers.is_empty()); - assert!( - providers.len() >= DEFAULT_PROVIDER_LIMIT, - "{} < {}", - providers.len(), - DEFAULT_PROVIDER_LIMIT - ); - }; - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_local_peer_id() -> Result<()> { - let test_runner = TestRunnerBuilder::new().no_bootstrap().build().await?; - let got_peer_id = test_runner.client.local_peer_id().await?; - assert_eq!(test_runner.peer_id, got_peer_id); - Ok(()) - } - - #[tokio::test] - #[ignore] - async fn test_two_nodes() -> Result<()> { - let test_runner_a = TestRunnerBuilder::new().no_bootstrap().build().await?; - // peer_id 12D3KooWLo6JTNKXfjkZtKf8ooLQoXVXUEeuu4YDY3CYqK6rxHXt - let test_runner_b = TestRunnerBuilder::new() - .no_bootstrap() - .with_seed(ChaCha8Rng::from_seed([0; 32])) - .build() - .await?; - let addrs_b = vec![test_runner_b.addr.clone()]; - - let peer_id_a = test_runner_a.client.local_peer_id().await?; - assert_eq!(test_runner_a.peer_id, peer_id_a); - let peer_id_b = test_runner_b.client.local_peer_id().await?; - assert_eq!(test_runner_b.peer_id, peer_id_b); - - let lookup_a = test_runner_a.client.lookup_local().await?; - // since we aren't connected to any other nodes, we should not - // have any information about our observed addresses - assert!(lookup_a.observed_addrs.is_empty()); - assert_lookup(lookup_a, test_runner_a.peer_id, &test_runner_a.addr, &[])?; - - // connect - test_runner_a.client.connect(peer_id_b, addrs_b).await?; - - // Make sure the peers have had time to negotiate protocols - tokio::time::sleep(Duration::from_millis(2500)).await; - - // Make sure we have exchanged identity information - // peer b should be in the list of peers that peer a is connected to - let peers = test_runner_a.client.get_peers().await?; - assert!(peers.len() == 1); - let got_peer_addrs = peers.get(&peer_id_b).unwrap(); - assert!(got_peer_addrs.contains(&test_runner_b.dial_addr)); - - // lookup - let lookup_b = test_runner_a.client.lookup(peer_id_b, None).await?; - // Expected protocols are only the ones negotiated with a connected peer. - // NOTE: dcutr is not in the list because it is not negotiated with the peer. - let expected_protocols = [ - "/ipfs/ping/1.0.0", - "/ipfs/id/1.0.0", - "/ipfs/id/push/1.0.0", - "/ipfs/bitswap/1.2.0", - "/ipfs/bitswap/1.1.0", - "/ipfs/bitswap/1.0.0", - "/ipfs/bitswap", - "/ipfs/kad/1.0.0", - "/libp2p/autonat/1.0.0", - "/libp2p/circuit/relay/0.2.0/hop", - "/libp2p/circuit/relay/0.2.0/stop", - "/meshsub/1.1.0", - "/meshsub/1.0.0", - ]; - assert_lookup( - lookup_b, - test_runner_b.peer_id, - &test_runner_b.addr, - &expected_protocols[..], - )?; - // now that we are connected & have exchanged identity information, - // we should now be able to view the node's external addrs - // these are the addresses that other nodes tell you "this is the address I see for you" - let external_addrs_a = test_runner_a.client.external_addresses().await?; - assert_eq!(vec![test_runner_a.addr.clone()], external_addrs_a); - - // peer_disconnect NOT YET IMPLEMENTED - // test_runner_a.client.disconnect(peer_id_b).await?; - // let peers = test_runner_a.client.get_peers().await?; - // assert!(peers.len() == 0); - - Ok(()) - } - - // assert_lookup ensures each part of the lookup is equal - fn assert_lookup( - got: Lookup, - expected_peer_id: PeerId, - expected_addr: &Multiaddr, - expected_protocols: &[&str], - ) -> Result<()> { - let expected_protocol_version = "ipfs/0.1.0"; - let expected_agent_version = - format!("iroh/{}", std::env::var("CARGO_PKG_VERSION").unwrap()); - - assert_eq!(expected_peer_id, got.peer_id); - assert!(got.listen_addrs.contains(expected_addr)); - assert_eq!(expected_protocols, got.protocols); - assert_eq!(expected_protocol_version, got.protocol_version); - assert_eq!(expected_agent_version, got.agent_version); - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_cancel_listen_for_identify() -> Result<()> { - let mut test_runner_a = TestRunnerBuilder::new().no_bootstrap().build().await?; - let peer_id: PeerId = "12D3KooWFma2D63TG9ToSiRsjFkoNm2tTihScTBAEdXxinYk5rwE" - .parse() - .unwrap(); - test_runner_a - .client - .lookup(peer_id, None) - .await - .unwrap_err(); - // when lookup ends in error, we must ensure we - // have canceled the lookup - let event = test_runner_a.network_events.recv().await.unwrap(); - if let NetworkEvent::CancelLookupQuery(got_peer_id) = event { - assert_eq!(peer_id, got_peer_id); - } else { - anyhow::bail!("unexpected NetworkEvent {:#?}", event); - } - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - #[cfg_attr(target_os = "macos", ignore = "on MacOS")] - async fn test_dht() -> Result<()> { - // set up three nodes - // two connect to one - // try to connect via id - let cid: Cid = "bafkreieq5jui4j25lacwomsqgjeswwl3y5zcdrresptwgmfylxo2depppq" // cspell:disable-line - .parse() - .unwrap(); - - let test_runner_a = TestRunnerBuilder::new() - .no_bootstrap() - // We can trust all peers as they are the other test runners. - // - // We need to trust the observed_addrs because otherwise kademlia will not switch into server mode for the - // established connections because there is no external address to be used. - .with_trust_observed_addrs(true) - .build() - .await?; - println!("peer_a: {:?}", test_runner_a.peer_id); - - let mut test_runner_b = TestRunnerBuilder::new() - .no_bootstrap() - .with_trust_observed_addrs(true) - .with_seed(ChaCha8Rng::from_seed([0; 32])) - .build() - .await?; - let addrs = vec![test_runner_b.addr.clone()]; - - println!("peer_b: {:?}", test_runner_b.peer_id); - - let test_runner_c = TestRunnerBuilder::new() - .no_bootstrap() - .with_trust_observed_addrs(true) - .with_seed(ChaCha8Rng::from_seed([1; 32])) - .build() - .await?; - - println!("peer_c: {:?}", test_runner_c.peer_id); - - // connect a and c to b - test_runner_a - .client - .connect(test_runner_b.peer_id, addrs.clone()) - .await?; - - // expect a network event showing a & b have connected - match test_runner_b.network_events.recv().await { - Some(NetworkEvent::PeerConnected(peer_id)) => { - assert_eq!(test_runner_a.peer_id, peer_id); - } - Some(n) => { - anyhow::bail!("unexpected network event: {:?}", n); - } - None => { - anyhow::bail!("expected NetworkEvent::PeerConnected, received no event"); - } - }; - - test_runner_c - .client - .connect(test_runner_b.peer_id, addrs.clone()) - .await?; - - // expect a network event showing b & c have connected - match test_runner_b.network_events.recv().await { - Some(NetworkEvent::PeerConnected(peer_id)) => { - assert_eq!(test_runner_c.peer_id, peer_id); - } - Some(n) => { - anyhow::bail!("unexpected network event: {:?}", n); - } - None => { - anyhow::bail!("expected NetworkEvent::PeerConnected, received no event"); - } - }; - - // c start providing - test_runner_c.client.start_providing(&cid).await?; - - // when `start_providing` waits for the record to make it to the dht - // we can remove this polling - let providers = tokio::time::timeout( - Duration::from_secs(6), - poll_for_providers(test_runner_a.client.clone(), &cid), - ) - .await - .context("timed out before finding providers for the given cid")??; - - assert!(providers.len() == 1); - assert!(providers.first().unwrap().contains(&test_runner_c.peer_id)); - - // c stop providing - test_runner_c.client.stop_providing(&cid).await?; - - // a fetch providers dht should not get any providers - let stream = test_runner_a.client.fetch_providers_dht(&cid).await?; - let providers: Vec<_> = stream.try_collect().await.unwrap(); - - assert!(providers.is_empty()); - - // try to connect a to c using only peer_id - test_runner_a - .client - .connect(test_runner_c.peer_id, vec![]) - .await?; - Ok(()) - } - - async fn poll_for_providers(client: P2pClient, cid: &Cid) -> Result>> { - loop { - let stream = client.fetch_providers_dht(cid).await?; - let providers: Vec<_> = stream.try_collect().await.unwrap(); - if providers.is_empty() { - continue; - } - return Ok(providers); - } - } -} diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 43ac38fe..f0b90a98 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -8,7 +8,7 @@ use tokio::{ select, sync::{mpsc, oneshot}, }; -use tracing::warn; +use tracing::{debug, warn}; /// [`InterestProvider`] that is interested in [`PeerKey`]s that have not expired. #[derive(Debug, Clone)] @@ -50,8 +50,8 @@ impl PeerService for Arc { #[derive(Debug)] pub enum Message { - /// Inform the peers loop about a new local address. - NewLocalAddress(Multiaddr), + /// Inform the peers loop about new local addresses. + NewLocalAddresses(Vec), /// Inform the peers loop about a local address that is no longer valid. RemoveLocalAddress(Multiaddr), /// Report a list of all remote peers. @@ -60,6 +60,8 @@ pub enum Message { } /// Run a loop handling messages and publishing the local node into the Peer recon ring. +/// The local node its expiration time will be set `expiration` duration in the future +/// and published at twice the frequency that it expires. pub async fn run( expiration: Duration, node_key: NodeKey, @@ -74,7 +76,9 @@ pub async fn run( do_tick(expiration, &node_key, addresses.iter().cloned().collect(), &svc).await } Some(m) = messages.recv() => { - handle_message(node_key.id(), m, &mut addresses,&svc).await + if handle_message(node_key.id(), m, &mut addresses,&svc).await{ + do_tick(expiration, &node_key, addresses.iter().cloned().collect(), &svc).await + } } } } @@ -113,13 +117,16 @@ async fn handle_message( message: Message, addressess: &mut BTreeSet, svc: &impl PeerService, -) { +) -> bool { + debug!(%node_id, ?message, "handle_message"); match message { - Message::NewLocalAddress(address) => { - addressess.insert(address); + Message::NewLocalAddresses(address) => { + addressess.extend(address.into_iter()); + true } Message::RemoveLocalAddress(address) => { addressess.remove(&address); + true } Message::AllRemotePeers(tx) => { let r = match svc.all_peers().await { @@ -142,6 +149,7 @@ async fn handle_message( if tx.send(r).is_err() { warn!("failed to send all peers response"); } + false } } } diff --git a/p2p/src/swarm.rs b/p2p/src/swarm.rs index 8beb131e..2095292c 100644 --- a/p2p/src/swarm.rs +++ b/p2p/src/swarm.rs @@ -5,7 +5,7 @@ use libp2p_identity::Keypair; use recon::{libp2p::Recon, Sha256a}; use std::sync::Arc; -use crate::{behaviour::NodeBehaviour, Libp2pConfig, Metrics}; +use crate::{behaviour::NodeBehaviour, peers, Libp2pConfig, Metrics}; fn get_dns_config() -> (dns::ResolverConfig, dns::ResolverOpts) { match hickory_resolver::system_conf::read_system_conf() { @@ -33,6 +33,7 @@ pub(crate) async fn build_swarm( keypair: Keypair, recons: Option<(P, I, M)>, block_store: Arc, + peers_tx: tokio::sync::mpsc::Sender, metrics: Metrics, ) -> Result>> where @@ -78,6 +79,7 @@ where Some(relay_client), recons, block_store, + peers_tx, metrics.clone(), ) .map_err(|err| err.into()) @@ -87,8 +89,16 @@ where } else { Ok(builder .with_behaviour(|keypair| { - new_behavior(config, keypair, None, recons, block_store, metrics.clone()) - .map_err(|err| err.into()) + new_behavior( + config, + keypair, + None, + recons, + block_store, + peers_tx, + metrics.clone(), + ) + .map_err(|err| err.into()) })? .with_swarm_config(with_config) .build()) @@ -101,6 +111,7 @@ fn new_behavior( relay_client: Option, recons: Option<(P, I, M)>, block_store: Arc, + peers_tx: tokio::sync::mpsc::Sender, metrics: Metrics, ) -> Result> where @@ -119,6 +130,7 @@ where relay_client, recons, block_store, + peers_tx, metrics, )) }) diff --git a/p2p/tests/node.rs b/p2p/tests/node.rs new file mode 100644 index 00000000..e40ef6ca --- /dev/null +++ b/p2p/tests/node.rs @@ -0,0 +1,262 @@ +use std::{sync::Arc, time::Duration}; + +use anyhow::{Context as _, Result}; +use ceramic_core::NodeKey; +use ceramic_event_svc::store::SqlitePool; +use iroh_rpc_client::P2pClient; +use iroh_rpc_types::Addr; +use libp2p::{Multiaddr, PeerId}; +use recon::{FullInterests, Recon, ReconInterestProvider}; +use test_log::test; + +use ceramic_p2p::{Config, Metrics, NetworkEvent, Node, PeerKeyInterests}; +use tokio::{sync::mpsc::Receiver, task::JoinHandle, time::timeout}; +use tracing::debug; + +#[derive(Debug)] +struct TestRunnerBuilder {} + +impl TestRunnerBuilder { + fn new() -> Self { + Self {} + } + + async fn build(self) -> Result { + let rpc_server_addr = Addr::new_mem(); + let rpc_client_addr = rpc_server_addr.clone(); + let mut network_config = Config::default_with_rpc(rpc_client_addr.clone()); + + // Bind to an open port + network_config.libp2p.listening_multiaddrs = vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()]; + // Do not bootstrap + network_config.libp2p.ceramic_peers = vec![]; + + // Using an in memory DB for the tests for realistic benchmark disk DB is needed. + let sql_pool = SqlitePool::connect_in_memory().await.unwrap(); + let peer_svc = Arc::new(ceramic_peer_svc::PeerService::new(sql_pool.clone())); + let interest_svc = Arc::new(ceramic_interest_svc::InterestService::new(sql_pool.clone())); + let event_svc = + Arc::new(ceramic_event_svc::EventService::try_new(sql_pool, true, true, vec![]).await?); + + let mut registry = prometheus_client::registry::Registry::default(); + let metrics = Metrics::register(&mut registry); + let recon_metrics = recon::Metrics::register(&mut registry); + let node_key = NodeKey::random(); + let peer_id = node_key.peer_id(); + let mut p2p = Node::new( + network_config, + rpc_server_addr, + node_key.clone(), + Arc::clone(&peer_svc), + Some(( + Recon::new(peer_svc, PeerKeyInterests, recon_metrics.clone()), + Recon::new( + Arc::clone(&interest_svc), + FullInterests::default(), + recon_metrics.clone(), + ), + Recon::new( + Arc::clone(&event_svc), + ReconInterestProvider::new(node_key.id(), interest_svc), + recon_metrics.clone(), + ), + )), + event_svc, + metrics, + ) + .await?; + let cfg = iroh_rpc_client::Config { + p2p_addr: Some(rpc_client_addr), + channels: Some(1), + ..Default::default() + }; + + let client = iroh_rpc_client::Client::new(cfg).await?; + + let network_events = p2p.network_events(); + let task = tokio::task::spawn(async move { p2p.run().await.unwrap() }); + + let client = client.try_p2p()?; + + let addr = timeout(Duration::from_millis(500), get_addr_loop(client.clone())) + .await + .context("timed out before getting a listening address for the node")??; + Ok(TestRunner { + task, + client, + peer_id, + network_events, + addr, + }) + } +} + +async fn get_addr_loop(client: P2pClient) -> Result { + loop { + let l = client.listeners().await?; + if let Some(a) = l.first() { + return Ok(a.clone()); + } + } +} + +struct TestRunner { + /// The task that runs the p2p node. + task: JoinHandle<()>, + /// The RPC client + /// Used to communicate with the p2p node. + client: P2pClient, + /// The node's peer_id + peer_id: PeerId, + /// A channel to read the network events received by the node. + network_events: Receiver, + /// The listening address for this node. + addr: Multiaddr, +} + +impl Drop for TestRunner { + fn drop(&mut self) { + self.task.abort(); + } +} + +#[tokio::test] +async fn test_local_peer_id() -> Result<()> { + let test_runner = TestRunnerBuilder::new().build().await?; + let got_peer_id = test_runner.client.local_peer_id().await?; + assert_eq!(test_runner.peer_id, got_peer_id); + Ok(()) +} + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn test_two_nodes() -> Result<()> { + let mut test_runner_a = TestRunnerBuilder::new().build().await?; + let mut test_runner_b = TestRunnerBuilder::new().build().await?; + let addrs_b = vec![test_runner_b.addr.clone()]; + debug!(?test_runner_a.peer_id, ?test_runner_b.peer_id, "peer ids"); + + let peer_id_a = test_runner_a.client.local_peer_id().await?; + assert_eq!(test_runner_a.peer_id, peer_id_a); + let peer_id_b = test_runner_b.client.local_peer_id().await?; + assert_eq!(test_runner_b.peer_id, peer_id_b); + + // connect + test_runner_a.client.connect(peer_id_b, addrs_b).await?; + + timeout( + Duration::from_millis(500), + wait_bi_connected(&mut test_runner_a, &mut test_runner_b), + ) + .await + .context("waiting for a and b to connect")?; + + assert!(is_bi_connected(&test_runner_a, &test_runner_b).await?); + + Ok(()) +} + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn test_three_nodes() -> Result<()> { + let mut test_runner_a = TestRunnerBuilder::new().build().await?; + let mut test_runner_b = TestRunnerBuilder::new().build().await?; + let mut test_runner_c = TestRunnerBuilder::new().build().await?; + + let peer_id_a = test_runner_a.client.local_peer_id().await?; + assert_eq!(test_runner_a.peer_id, peer_id_a); + let peer_id_b = test_runner_b.client.local_peer_id().await?; + assert_eq!(test_runner_b.peer_id, peer_id_b); + let peer_id_c = test_runner_c.client.local_peer_id().await?; + assert_eq!(test_runner_c.peer_id, peer_id_c); + + // connect a to b + test_runner_a + .client + .connect(peer_id_b, vec![test_runner_b.addr.clone()]) + .await?; + + timeout( + Duration::from_millis(500), + wait_bi_connected(&mut test_runner_a, &mut test_runner_b), + ) + .await + .context("waiting for a and b to connect")?; + assert!(is_bi_connected(&test_runner_a, &test_runner_b).await?); + + // connect b to c + test_runner_b + .client + .connect(peer_id_c, vec![test_runner_c.addr.clone()]) + .await?; + + timeout( + Duration::from_millis(500), + wait_bi_connected(&mut test_runner_b, &mut test_runner_c), + ) + .await + .context("waiting for b and c to connect")?; + assert!(is_bi_connected(&test_runner_b, &test_runner_c).await?); + + // We expect that a and c find each other through b and become connected + timeout( + //This one takes longer as we need recon to run and sync peers + Duration::from_millis(3_000), + wait_bi_connected(&mut test_runner_a, &mut test_runner_c), + ) + .await + .context("waiting for a and c to connect")?; + assert!(is_bi_connected(&test_runner_a, &test_runner_c).await?); + + Ok(()) +} + +// Reports if peers are connected to the other peer in both directions +async fn is_bi_connected(a: &TestRunner, b: &TestRunner) -> Result { + Ok(is_connected(a, b).await? && is_connected(b, a).await?) +} +// Reports if a is connected to b +async fn is_connected(a: &TestRunner, b: &TestRunner) -> Result { + let peers = a.client.get_peers().await?; + Ok(peers.contains_key(&b.peer_id)) +} + +// Waits until a emits event that it is connected to b +// and b emits an event that it is connected to a. +async fn wait_bi_connected(a: &mut TestRunner, b: &mut TestRunner) { + wait_connected(a, b).await; + wait_connected(b, a).await; +} +// Waits until a emits event that it is connected to b +// and b emits an event that it is connected to a. +async fn wait_connected(a: &mut TestRunner, b: &mut TestRunner) { + while let Some(event) = a.network_events.recv().await { + match event { + NetworkEvent::PeerConnected(id) if id == b.peer_id => { + break; + } + _ => {} + } + } +} + +#[test(tokio::test)] +async fn test_cancel_listen_for_identify() -> Result<()> { + let mut test_runner_a = TestRunnerBuilder::new().build().await?; + let peer_id: PeerId = "12D3KooWFma2D63TG9ToSiRsjFkoNm2tTihScTBAEdXxinYk5rwE" + .parse() + .unwrap(); + test_runner_a + .client + .lookup(peer_id, None) + .await + .unwrap_err(); + // when lookup ends in error, we must ensure we + // have canceled the lookup + let event = test_runner_a.network_events.recv().await.unwrap(); + if let NetworkEvent::CancelLookupQuery(got_peer_id) = event { + assert_eq!(peer_id, got_peer_id); + } else { + anyhow::bail!("unexpected NetworkEvent {:#?}", event); + } + + Ok(()) +} diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index b70cdf4f..bf25cd92 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -17,6 +17,9 @@ mod stream_set; mod tests; mod upgrade; +pub use crate::protocol::Recon; +pub use stream_set::StreamSet; + use ceramic_core::{EventId, Interest, PeerKey}; use futures::{future::BoxFuture, FutureExt}; use libp2p::{ @@ -33,12 +36,8 @@ use std::{ use tokio::time::Instant; use tracing::{debug, trace, warn}; -pub use crate::protocol::Recon; use crate::{ - libp2p::{ - handler::{FromBehaviour, FromHandler, Handler}, - stream_set::StreamSet, - }, + libp2p::handler::{FromBehaviour, FromHandler, Handler}, Sha256a, }; @@ -132,6 +131,8 @@ pub enum PeerStatus { Synchronized { /// The stream_set that was synchronized stream_set: StreamSet, + /// The number of new keys inserted during the synchronization. + new_count: usize, }, /// Local peer has started to synchronize with the remote peer. Started { @@ -263,9 +264,12 @@ where // The peer has synchronized with us, mark the time and record that the peer connection // is now idle. - FromHandler::Succeeded { stream_set } => { + FromHandler::Succeeded { + stream_set, + new_count, + } => { if let Entry::Occupied(mut entry) = self.peers.entry(peer_id) { - debug!(%peer_id, ?stream_set, "synchronization succeeded with peer"); + debug!(%peer_id, ?stream_set, new_count, "synchronization succeeded with peer"); let info = entry.get_mut(); let sync_delay = *info .sync_delay @@ -276,7 +280,10 @@ where // On success reset delay info.sync_delay .insert(stream_set, self.config.per_peer_sync_delay); - info.status = PeerStatus::Synchronized { stream_set }; + info.status = PeerStatus::Synchronized { + stream_set, + new_count, + }; Some(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent { remote_peer_id: peer_id, status: info.status, diff --git a/recon/src/libp2p/handler.rs b/recon/src/libp2p/handler.rs index 82c4fa1d..f4e45927 100644 --- a/recon/src/libp2p/handler.rs +++ b/recon/src/libp2p/handler.rs @@ -73,7 +73,13 @@ where } } -type SyncFuture = libp2p::futures::future::BoxFuture<'static, Result>; +#[derive(Debug)] +pub struct SyncResult { + pub stream_set: StreamSet, + pub new_count: usize, +} + +type SyncFuture = libp2p::futures::future::BoxFuture<'static, Result>; /// Current state of the handler. /// @@ -141,6 +147,7 @@ pub enum FromHandler { }, Succeeded { stream_set: StreamSet, + new_count: usize, }, Stopped, Failed { @@ -200,9 +207,15 @@ where if let Poll::Ready(result) = stream.poll_unpin(cx) { self.transition_state(State::Idle); match result { - Ok(stream_set) => { + Ok(SyncResult { + stream_set, + new_count, + }) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - FromHandler::Succeeded { stream_set }, + FromHandler::Succeeded { + stream_set, + new_count, + }, )); } Err(e) => { diff --git a/recon/src/libp2p/protocol.rs b/recon/src/libp2p/protocol.rs index 3ed49a74..cd571040 100644 --- a/recon/src/libp2p/protocol.rs +++ b/recon/src/libp2p/protocol.rs @@ -7,6 +7,7 @@ use tracing::Level; use ceramic_core::NodeId; +use crate::libp2p::handler::SyncResult; use crate::{ libp2p::stream_set::StreamSet, protocol::{self, ProtocolConfig, Recon}, @@ -20,7 +21,7 @@ pub async fn initiate_synchronize( stream_set: StreamSet, recon: R, stream: S, -) -> Result +) -> Result where R: Recon, S: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -28,9 +29,13 @@ where let codec = CborCodec::new(); let stream = Framed::new(stream, codec); let remote_node_id = NodeId::try_from_peer_id(&remote_peer_id)?; - protocol::initiate_synchronize(recon, stream, ProtocolConfig::new_node_id(remote_node_id)) - .await?; - Ok(stream_set) + let new_count = + protocol::initiate_synchronize(recon, stream, ProtocolConfig::new_node_id(remote_node_id)) + .await?; + Ok(SyncResult { + stream_set, + new_count, + }) } // Intiate Recon synchronization with a peer over a stream. #[tracing::instrument(skip(recon, stream, ), ret(level = Level::DEBUG))] @@ -40,7 +45,7 @@ pub async fn respond_synchronize( stream_set: StreamSet, recon: R, stream: S, -) -> Result +) -> Result where R: Recon, S: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -48,7 +53,11 @@ where let codec = CborCodec::new(); let stream = Framed::new(stream, codec); let remote_node_id = NodeId::try_from_peer_id(&remote_peer_id)?; - protocol::respond_synchronize(recon, stream, ProtocolConfig::new_node_id(remote_node_id)) - .await?; - Ok(stream_set) + let new_count = + protocol::respond_synchronize(recon, stream, ProtocolConfig::new_node_id(remote_node_id)) + .await?; + Ok(SyncResult { + stream_set, + new_count, + }) } diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 949f8440..ed11d688 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -305,7 +305,7 @@ async fn model_error_backoff() { let crate::libp2p::Event::PeerEvent(PeerEvent { status, .. }) = ev; match status { PeerStatus::Waiting | PeerStatus::Stopped => None, - PeerStatus::Synchronized { stream_set } + PeerStatus::Synchronized { stream_set, .. } | PeerStatus::Started { stream_set } | PeerStatus::Failed { stream_set } => Some(*stream_set), } @@ -391,7 +391,8 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { PeerEvent { remote_peer_id: id, status: PeerStatus::Synchronized { - stream_set: StreamSet::Peer + stream_set: StreamSet::Peer, + new_count: 0, } } ); @@ -409,7 +410,8 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { PeerEvent { remote_peer_id: id, status: PeerStatus::Synchronized { - stream_set: StreamSet::Interest + stream_set: StreamSet::Interest, + new_count: 0, } } ); @@ -427,7 +429,8 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 6]) { PeerEvent { remote_peer_id: id, status: PeerStatus::Synchronized { - stream_set: StreamSet::Model + stream_set: StreamSet::Model, + new_count: 0, } } ); diff --git a/recon/src/metrics.rs b/recon/src/metrics.rs index c628dd5d..81c1b753 100644 --- a/recon/src/metrics.rs +++ b/recon/src/metrics.rs @@ -24,6 +24,7 @@ pub struct Metrics { protocol_write_loop_count: Counter, protocol_run_duration: Histogram, + protocol_run_new_keys: Histogram, protocol_pending_items: Counter, protocol_invalid_items: Family, @@ -99,6 +100,12 @@ impl Metrics { Histogram::new(exponential_buckets(0.005, 2.0, 20)), sub_registry ); + register!( + protocol_run_new_keys, + "Number of new keys discovered for each protocol run", + Histogram::new(exponential_buckets(1.0, 2.0, 20)), + sub_registry + ); register!( protocol_pending_items, @@ -119,6 +126,7 @@ impl Metrics { protocol_message_sent_count, protocol_write_loop_count, protocol_run_duration, + protocol_run_new_keys, protocol_pending_items, protocol_invalid_items, } @@ -158,10 +166,11 @@ impl Recorder for Metrics { } } -pub(crate) struct ProtocolRun(pub Duration); +pub(crate) struct ProtocolRun(pub Duration, pub f64); impl Recorder for Metrics { fn record(&self, event: &ProtocolRun) { self.protocol_run_duration.observe(event.0.as_secs_f64()); + self.protocol_run_new_keys.observe(event.1); } } diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index 0ac52b20..f38a8432 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -92,7 +92,7 @@ pub async fn initiate_synchronize( recon: R, stream: S, config: ProtocolConfig, -) -> Result<()> +) -> Result where R: Recon, S: Stream, E>> @@ -103,12 +103,15 @@ where { let metrics = recon.metrics(); let sync_id = Some(Uuid::new_v4().to_string()); - protocol(sync_id, Initiator::new(recon, config), stream, metrics).await?; - Ok(()) + protocol(sync_id, Initiator::new(recon, config), stream, metrics).await } /// Respond to an initiated Recon synchronization with a peer over a stream. #[tracing::instrument(skip(recon, stream), ret(level = Level::DEBUG))] -pub async fn respond_synchronize(recon: R, stream: S, config: ProtocolConfig) -> Result<()> +pub async fn respond_synchronize( + recon: R, + stream: S, + config: ProtocolConfig, +) -> Result where R: Recon, S: Stream, E>> @@ -118,8 +121,7 @@ where E: std::error::Error + Send + Sync + 'static, { let metrics = recon.metrics(); - protocol(None, Responder::new(recon, config), stream, metrics).await?; - Ok(()) + protocol(None, Responder::new(recon, config), stream, metrics).await } /// Recon message envelope @@ -206,7 +208,7 @@ async fn protocol( mut role: R, stream: S, metrics: Metrics, -) -> Result<()> +) -> Result where R: Role, R::Out: std::fmt::Debug + Send + 'static, @@ -234,7 +236,7 @@ where metrics.clone(), ); - let read = read(sync_id, stream, role, to_writer_tx, metrics.clone()); + let read = read(sync_id, stream, &mut role, to_writer_tx, metrics.clone()); // In a recon conversation there are 4 futures being polled: // @@ -262,15 +264,15 @@ where let _res = tokio::try_join!(write, read) .map_err(|e: anyhow::Error| anyhow!("protocol error: {}", e))?; - metrics.record(&ProtocolRun(start.elapsed())); - Ok(()) + metrics.record(&ProtocolRun(start.elapsed(), role.new_count() as f64)); + Ok(role.new_count()) } #[instrument(skip_all, fields(sync_id))] async fn read( mut sync_id: Option, stream: S, - mut role: R, + role: &mut R, mut to_writer_tx: mpsc::Sender>, metrics: Metrics, ) -> Result<()> @@ -444,6 +446,9 @@ trait Role { to_writer: &mut ToWriterSender, message: Self::In, ) -> Result; + + // Report the number of new keys inserted + fn new_count(&self) -> usize; } // Initiator implements the Role that starts the synchronize conversation. @@ -590,6 +595,10 @@ where }; Ok(RemoteStatus::Active) } + + fn new_count(&self) -> usize { + self.common.new_count + } } // Responder implements the [`Role`] where it responds to incoming requests. @@ -707,6 +716,9 @@ where } } } + fn new_count(&self) -> usize { + self.common.new_count + } } // Common implements common behaviors to both [`Initiator`] and [`Responder`]. @@ -714,6 +726,7 @@ struct Common { recon: R, event_q: Vec>, config: ProtocolConfig, + new_count: usize, } impl Common @@ -725,6 +738,7 @@ where recon, event_q: Vec::with_capacity(config.insert_batch_size.saturating_add(1)), config, + new_count: 0, } } @@ -788,6 +802,7 @@ where self.config.node_id.peer_id() ); } + self.new_count += batch.count_inserted(); // for now, we record the metrics from recon but the service is the one that will track and try to store them // this may get more sophisticated as we want to tie reputation into this, or make recon more aware of the meaning of