diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 6bf7614b..24e2a8df 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -61,6 +61,8 @@ pub(crate) struct EventHandler { request_response_senders: FnvHashMap, rendezvous_cookies: FnvHashMap, ws_msg_sender: ws::Notifier, + node_addresses: Vec, + announce_addresses: Vec, external_address_limit: u32, } @@ -77,8 +79,10 @@ pub(crate) struct EventHandler { receiver: channel::AsyncBoundedChannelReceiver, query_senders: FnvHashMap)>, connected_peers: FnvHashMap, - rendezvous_cookies: FnvHashMap, request_response_senders: FnvHashMap, + rendezvous_cookies: FnvHashMap, + node_addresses: Vec, + announce_addresses: Vec, external_address_limit: u32, } @@ -117,6 +121,8 @@ where connected_peers: FnvHashMap::default(), rendezvous_cookies: FnvHashMap::default(), ws_msg_sender, + node_addresses: settings.network.node_addresses.clone(), + announce_addresses: settings.network.announce_addresses.clone(), external_address_limit: settings.network.max_announce_addresses, } } @@ -135,8 +141,10 @@ where receiver, query_senders: FnvHashMap::default(), connected_peers: FnvHashMap::default(), - rendezvous_cookies: FnvHashMap::default(), request_response_senders: FnvHashMap::default(), + rendezvous_cookies: FnvHashMap::default(), + node_addresses: settings.network.node_addresses.clone(), + announce_addresses: settings.network.announce_addresses.clone(), external_address_limit: settings.network.max_announce_addresses, } } diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 0cf2e8b8..c64cf194 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -6,6 +6,7 @@ use crate::network::IpfsCli; use crate::{ db::{Connection, Database}, event_handler::{event::QueryRecord, Event, Handler, RequestResponseError}, + libp2p::multiaddr::MultiaddrExt, network::swarm::{CapsuleTag, ComposedEvent, RequestResponseKey, HOMESTAR_PROTOCOL_VER}, receipt::{RECEIPT_TAG, VERSION_KEY}, workflow, @@ -93,7 +94,6 @@ async fn handle_swarm_event( event_handler: &mut EventHandler, ) { match event { - // N.B. Labels should be ordered with peer_id first for log testing SwarmEvent::Behaviour(ComposedEvent::Identify(identify_event)) => { match identify_event { identify::Event::Error { peer_id, error } => { @@ -105,6 +105,12 @@ async fn handle_swarm_event( identify::Event::Received { peer_id, info } => { debug!(peer_id=peer_id.to_string(), info=?info, "identify info received from peer"); + // Ignore peers that do not use the Homestar protocol + if info.protocol_version != HOMESTAR_PROTOCOL_VER { + info!(protocol_version=info.protocol_version, "peer was not using our homestar protocol version: {HOMESTAR_PROTOCOL_VER}"); + return; + } + let num_addresses = event_handler.swarm.external_addresses().count(); // underlying structure is a hashset, so no worry on dupes @@ -125,17 +131,15 @@ async fn handle_swarm_event( let behavior = event_handler.swarm.behaviour_mut(); - // don't bother talking with nodes that aren't running our protocol - if info.protocol_version == HOMESTAR_PROTOCOL_VER { - debug!(protocol_version=info.protocol_version, "peer was not using our homestar protocol version: {HOMESTAR_PROTOCOL_VER}"); - return; - } - // kademlia if info.protocols.contains(&kad::PROTOCOL_NAME) { // add listen addresses to kademlia routing table for addr in info.listen_addrs { behavior.kademlia.add_address(&peer_id, addr); + debug!( + peer_id = peer_id.to_string(), + "added identified node to kademlia routing table" + ); } } @@ -172,7 +176,7 @@ async fn handle_swarm_event( } identify::Event::Pushed { peer_id } => debug!( peer_id = peer_id.to_string(), - "pushed identify info too peer" + "pushed identify info to peer" ), } } @@ -295,6 +299,7 @@ async fn handle_swarm_event( } _ => {} }, + SwarmEvent::Behaviour(ComposedEvent::Kademlia( KademliaEvent::OutboundQueryProgressed { id, result, .. }, )) => { @@ -432,6 +437,15 @@ async fn handle_swarm_event( _ => {} } } + SwarmEvent::Behaviour(ComposedEvent::Kademlia(KademliaEvent::RoutingUpdated { + peer, + .. + })) => { + debug!( + peer = peer.to_string(), + "kademlia routing table updated with peer" + ) + } SwarmEvent::Behaviour(ComposedEvent::RequestResponse( request_response::Event::Message { @@ -555,6 +569,10 @@ async fn handle_swarm_event( ); if mdns.has_node(&peer_id) { behaviour.kademlia.remove_address(&peer_id, &multiaddr); + debug!( + peer_id = peer_id.to_string(), + "removed peer address from kademlia table" + ); } } } @@ -580,6 +598,29 @@ async fn handle_swarm_event( "peer connection closed, cause: {cause:?}" ); event_handler.connected_peers.remove_entry(&peer_id); + + // Remove peer from DHT if not in configured peers + if event_handler.node_addresses.iter().all(|multiaddr| { + if let Some(id) = multiaddr.peer_id() { + id != peer_id + } else { + // TODO: We may want to check the multiadress without relying on + // the peer ID. This would give more flexibility when configuring nodes. + warn!("Configured peer must include a peer ID: {multiaddr}"); + true + } + }) { + event_handler + .swarm + .behaviour_mut() + .kademlia + .remove_peer(&peer_id); + + debug!( + peer_id = peer_id.to_string(), + "removed peer from kademlia table" + ); + } } SwarmEvent::OutgoingConnectionError { connection_id, diff --git a/homestar-runtime/src/lib.rs b/homestar-runtime/src/lib.rs index 182a0cee..e6f84323 100644 --- a/homestar-runtime/src/lib.rs +++ b/homestar-runtime/src/lib.rs @@ -38,6 +38,7 @@ pub mod test_utils; pub use db::Db; pub use event_handler::channel; +pub(crate) mod libp2p; pub use logger::*; pub(crate) mod metrics; pub use receipt::{Receipt, RECEIPT_TAG, VERSION_KEY}; diff --git a/homestar-runtime/src/libp2p/mod.rs b/homestar-runtime/src/libp2p/mod.rs new file mode 100644 index 00000000..8254556f --- /dev/null +++ b/homestar-runtime/src/libp2p/mod.rs @@ -0,0 +1 @@ +pub(crate) mod multiaddr; diff --git a/homestar-runtime/src/libp2p/multiaddr.rs b/homestar-runtime/src/libp2p/multiaddr.rs new file mode 100644 index 00000000..894d7008 --- /dev/null +++ b/homestar-runtime/src/libp2p/multiaddr.rs @@ -0,0 +1,36 @@ +use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; + +pub(crate) trait MultiaddrExt { + fn peer_id(&self) -> Option; +} + +impl MultiaddrExt for Multiaddr { + fn peer_id(&self) -> Option { + match self.iter().last() { + Some(Protocol::P2p(peer_id)) => Some(peer_id), + _ => None, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn contains_peer_id() { + let peer_id = PeerId::random(); + let multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/7001/p2p/{}", peer_id.to_base58()) + .parse() + .unwrap(); + + assert_eq!(Multiaddr::peer_id(&multiaddr).unwrap(), peer_id) + } + + #[test] + fn missing_peer_id() { + let multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/7001").parse().unwrap(); + + assert_eq!(Multiaddr::peer_id(&multiaddr), None) + } +} diff --git a/homestar-runtime/src/network/swarm.rs b/homestar-runtime/src/network/swarm.rs index 11d04950..fe4fd0a1 100644 --- a/homestar-runtime/src/network/swarm.rs +++ b/homestar-runtime/src/network/swarm.rs @@ -15,6 +15,7 @@ use libp2p::{ gossipsub::{self, MessageId, SubscriptionError, TopicHash}, identify, kad::{ + self, record::store::{MemoryStore, MemoryStoreConfig}, Kademlia, KademliaConfig, KademliaEvent, }, @@ -131,6 +132,12 @@ pub(crate) fn init( // Listen-on given address swarm.listen_on(settings.listen_address.to_string().parse()?)?; + // Set Kademlia server mode + swarm + .behaviour_mut() + .kademlia + .set_mode(Some(kad::Mode::Server)); + // add external addresses from settings if !settings.announce_addresses.is_empty() { for addr in settings.announce_addresses.iter() { diff --git a/homestar-runtime/tests/fixtures/test_mdns_connect1.toml b/homestar-runtime/tests/fixtures/test_mdns1.toml similarity index 100% rename from homestar-runtime/tests/fixtures/test_mdns_connect1.toml rename to homestar-runtime/tests/fixtures/test_mdns1.toml diff --git a/homestar-runtime/tests/fixtures/test_mdns_connect2.toml b/homestar-runtime/tests/fixtures/test_mdns2.toml similarity index 100% rename from homestar-runtime/tests/fixtures/test_mdns_connect2.toml rename to homestar-runtime/tests/fixtures/test_mdns2.toml diff --git a/homestar-runtime/tests/network.rs b/homestar-runtime/tests/network.rs index b1f1992b..27fce6df 100644 --- a/homestar-runtime/tests/network.rs +++ b/homestar-runtime/tests/network.rs @@ -1,7 +1,6 @@ -use crate::utils::{kill_homestar, retrieve_output, stop_homestar, BIN_NAME}; +use crate::utils::{check_lines_for, kill_homestar, retrieve_output, stop_homestar, BIN_NAME}; use anyhow::Result; use once_cell::sync::Lazy; -use predicates::prelude::*; use serial_test::file_serial; use std::{ path::PathBuf, @@ -29,9 +28,15 @@ fn test_libp2p_generates_peer_id_serial() -> Result<()> { let dead_proc = kill_homestar(homestar_proc, None); let stdout = retrieve_output(dead_proc); + let logs_expected = check_lines_for( + stdout, + vec![ + "local peer ID generated", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); - assert!(predicate::str::contains("message=\"local peer ID generated\" peer_id=12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN") - .eval(stdout.as_str())); + assert!(logs_expected); Ok(()) } @@ -53,12 +58,16 @@ fn test_libp2p_listens_on_address_serial() -> Result<()> { let dead_proc = kill_homestar(homestar_proc, None); let stdout = retrieve_output(dead_proc); - - assert!( - predicate::str::contains("local node is listening on /ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN") - .eval(stdout.as_str()) + let logs_expected = check_lines_for( + stdout, + vec![ + "local node is listening", + "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], ); + assert!(logs_expected); + Ok(()) } @@ -79,8 +88,9 @@ fn test_rpc_listens_on_address_serial() -> Result<()> { let dead_proc = kill_homestar(homestar_proc, None); let stdout = retrieve_output(dead_proc); + let logs_expected = check_lines_for(stdout, vec!["RPC server listening", "[::1]:9820"]); - assert!(predicate::str::contains("RPC server listening on [::1]:9820").eval(stdout.as_str())); + assert!(logs_expected); Ok(()) } @@ -103,11 +113,10 @@ fn test_websocket_listens_on_address_serial() -> Result<()> { let dead_proc = kill_homestar(homestar_proc, None); let stdout = retrieve_output(dead_proc); + let logs_expected = + check_lines_for(stdout, vec!["websocket server listening", "127.0.0.1:8020"]); - assert!( - predicate::str::contains("websocket server listening on 127.0.0.1:8020") - .eval(stdout.as_str()) - ); + assert!(logs_expected); Ok(()) } @@ -155,13 +164,67 @@ fn test_libp2p_connect_known_peers_serial() -> Result<()> { let stdout1 = retrieve_output(dead_proc1); let stdout2 = retrieve_output(dead_proc2); + // Check node two was added to the Kademlia table + let two_addded_to_dht = check_lines_for( + stdout1.clone(), + vec![ + "added configured node to kademlia routing table", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + // Check that DHT routing table was updated with node two + let two_in_dht_routing_table = check_lines_for( + stdout1.clone(), + vec![ + "kademlia routing table updated with peer", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + // Check that node one connected to node two. - assert!(predicate::str::contains("message=\"peer connection established\" peer_id=16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc") - .eval(stdout1.as_str())); + let one_connected_to_two = check_lines_for( + stdout1, + vec![ + "peer connection established", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + assert!(one_connected_to_two); + assert!(two_in_dht_routing_table); + assert!(two_addded_to_dht); + + // Check node one was added to the Kademlia table + let one_addded_to_dht = check_lines_for( + stdout2.clone(), + vec![ + "added configured node to kademlia routing table", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + // Check that DHT routing table was updated with node one + let one_in_dht_routing_table = check_lines_for( + stdout2.clone(), + vec![ + "kademlia routing table updated with peer", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); // Check that node two connected to node one. - assert!(predicate::str::contains("message=\"peer connection established\" peer_id=12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN") - .eval(stdout2.as_str())); + let two_connected_to_one = check_lines_for( + stdout2, + vec![ + "peer connection established", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + assert!(one_addded_to_dht); + assert!(one_in_dht_routing_table); + assert!(two_connected_to_one); Ok(()) } @@ -172,7 +235,7 @@ fn test_libp2p_connect_after_mdns_discovery_serial() -> Result<()> { let _ = stop_homestar(); // Start two nodes each configured to listen at 0.0.0.0 with no known peers. - // The nodes are configured with port 0 to allow the OS to selectn a port. + // The nodes are configured with port 0 to allow the OS to select a port. let homestar_proc1 = Command::new(BIN.as_os_str()) .env( "RUST_LOG", @@ -180,7 +243,7 @@ fn test_libp2p_connect_after_mdns_discovery_serial() -> Result<()> { ) .arg("start") .arg("-c") - .arg("tests/fixtures/test_mdns_connect1.toml") + .arg("tests/fixtures/test_mdns1.toml") .arg("--db") .arg("homestar1.db") .stdout(Stdio::piped()) @@ -194,7 +257,7 @@ fn test_libp2p_connect_after_mdns_discovery_serial() -> Result<()> { ) .arg("start") .arg("-c") - .arg("tests/fixtures/test_mdns_connect2.toml") + .arg("tests/fixtures/test_mdns2.toml") .arg("--db") .arg("homestar2.db") .stdout(Stdio::piped()) @@ -210,12 +273,202 @@ fn test_libp2p_connect_after_mdns_discovery_serial() -> Result<()> { let stdout2 = retrieve_output(dead_proc2); // Check that node one connected to node two. - assert!(predicate::str::contains("message=\"peer connection established\" peer_id=16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc") - .eval(stdout1.as_str())); + let one_connected_to_two = check_lines_for( + stdout1.clone(), + vec![ + "peer connection established", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + // Check node two was added to the Kademlia table + let two_addded_to_dht = check_lines_for( + stdout1.clone(), + vec![ + "added identified node to kademlia routing table", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + // Check that DHT routing table was updated with node two + let two_in_dht_routing_table = check_lines_for( + stdout1, + vec![ + "kademlia routing table updated with peer", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + assert!(one_connected_to_two); + assert!(two_addded_to_dht); + assert!(two_in_dht_routing_table); // Check that node two connected to node one. - assert!(predicate::str::contains("message=\"peer connection established\" peer_id=12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN") - .eval(stdout2.as_str())); + let two_connected_to_one = check_lines_for( + stdout2.clone(), + vec![ + "peer connection established", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + // Check node one was added to the Kademlia table + let one_addded_to_dht = check_lines_for( + stdout2.clone(), + vec![ + "added identified node to kademlia routing table", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + // Check that DHT routing table was updated with node one + let one_in_dht_routing_table = check_lines_for( + stdout2, + vec![ + "kademlia routing table updated with peer", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + assert!(two_connected_to_one); + assert!(one_addded_to_dht); + assert!(one_in_dht_routing_table); + + Ok(()) +} + +#[test] +#[file_serial] +fn test_libp2p_disconnect_mdns_discovery_serial() -> Result<()> { + let _ = stop_homestar(); + + // Start two nodes each configured to listen at 0.0.0.0 with no known peers. + // The nodes are configured with port 0 to allow the OS to select a port. + let homestar_proc1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_mdns1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_mdns2.toml") + .arg("--db") + .arg("homestar2.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Kill node two after seven seconds. + let _ = kill_homestar(homestar_proc2, Some(Duration::from_secs(7))); + + // Collect logs for eight seconds then kill node one. + let dead_proc1 = kill_homestar(homestar_proc1, Some(Duration::from_secs(8))); + + // Retrieve logs. + let stdout = retrieve_output(dead_proc1); + + // Check that node two disconnected from node one. + let two_disconnected_from_one = check_lines_for( + stdout.clone(), + vec![ + "peer connection closed", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + // Check that node two was removed from the Kademlia table + let two_removed_from_dht_table = check_lines_for( + stdout.clone(), + vec![ + "removed peer from kademlia table", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + assert!(two_disconnected_from_one); + assert!(two_removed_from_dht_table); + + Ok(()) +} + +#[test] +#[file_serial] +fn test_libp2p_disconnect_known_peers_serial() -> Result<()> { + let _ = stop_homestar(); + + // Start two nodes configured to listen at 127.0.0.1 each with their own port. + // The nodes are configured to dial each other through the node_addresses config. + let homestar_proc1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_network1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_network2.toml") + .arg("--db") + .arg("homestar2.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Kill node two after seven seconds. + let _ = kill_homestar(homestar_proc2, Some(Duration::from_secs(7))); + + // Collect logs for eight seconds then kill node one. + let dead_proc1 = kill_homestar(homestar_proc1, Some(Duration::from_secs(8))); + + // Retrieve logs. + let stdout = retrieve_output(dead_proc1); + + // Check that node two disconnected from node one. + let two_disconnected_from_one = check_lines_for( + stdout.clone(), + vec![ + "peer connection closed", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + // Check that node two was not removed from the Kademlia table. + let two_removed_from_dht_table = check_lines_for( + stdout.clone(), + vec![ + "removed peer from kademlia table", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + assert!(two_disconnected_from_one); + assert_eq!(false, two_removed_from_dht_table); Ok(()) } diff --git a/homestar-runtime/tests/utils.rs b/homestar-runtime/tests/utils.rs index 76604a53..57c8711a 100644 --- a/homestar-runtime/tests/utils.rs +++ b/homestar-runtime/tests/utils.rs @@ -7,6 +7,7 @@ use nix::{ unistd::Pid, }; use once_cell::sync::Lazy; +use predicates::prelude::*; use retry::{delay::Fixed, retry}; use std::{ net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream}, @@ -90,6 +91,20 @@ pub(crate) fn retrieve_output(proc: Child) -> String { String::from_utf8(plain_stdout_bytes).unwrap() } +/// Check process output for all predicates in any line +pub(crate) fn check_lines_for(output: String, predicates: Vec<&str>) -> bool { + output + .split("\n") + .map(|line| { + // Line contains all predicates + predicates + .iter() + .map(|pred| predicate::str::contains(*pred).eval(line)) + .fold(true, |acc, curr| acc && curr) + }) + .fold(false, |acc, curr| acc || curr) +} + /// Wait for process to exit or kill after timeout. pub(crate) fn kill_homestar(mut homestar_proc: Child, timeout: Option) -> Child { if let Ok(None) = homestar_proc.try_wait() {