Skip to content

Commit

Permalink
refactor: Improve Kademlia DHT usage (#378)
Browse files Browse the repository at this point in the history
# Description

This PR implements the following:

- [x] Add Kademlia server mode always on
- [x] Add Kademlia table add and remove debug logs
- [x] Add Kademlia routing table updated debug log
- [x] Remove peers that were discovered from DHT (Known peers remain in
the DHT)
- [x] Fix check for matching Homestar protocol in Identify events
- [x] Add `check_lines_for` utility function to correlate log outputs on
a single line
- [x] Test that known peers are added to DHT
- [x] Test that known peers are not removed from DHT on closed
connection
- [x] Test that peers are added to DHT on identify event following mDNS
discovery
- [x] Test that peers that were discovered through mDNS are removed from
DHT on closed connection
- [x] Add utility function to extract a Peer ID from a Multiaddress

## Link to issue

Please add a link to any relevant issues/tickets.

## Type of change

- [x] Bug fix (non-breaking change that fixes an issue)
- [x] Refactor (non-breaking change that updates existing functionality)

## Test plan (required)

We are adding tests for existing functionality. In addition, we added
tests to check:

- Peers are added to the Kademlia table on connection when they been
discovered through mDNS
- Peers are removed from the Kademlia table on disconnection when they
were discovered through mDNS
- Peers are not removed from the Kademlia when they were configured in
`node_addresses`
- Unit tests to check the Peer ID extraction utility
  • Loading branch information
bgins authored Oct 17, 2023
1 parent 3c5cb7b commit 91aae7f
Show file tree
Hide file tree
Showing 10 changed files with 396 additions and 34 deletions.
12 changes: 10 additions & 2 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub(crate) struct EventHandler<DB: Database> {
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
rendezvous_cookies: FnvHashMap<PeerId, Cookie>,
ws_msg_sender: ws::Notifier,
node_addresses: Vec<libp2p::Multiaddr>,
announce_addresses: Vec<libp2p::Multiaddr>,
external_address_limit: u32,
}

Expand All @@ -77,8 +79,10 @@ pub(crate) struct EventHandler<DB: Database> {
receiver: channel::AsyncBoundedChannelReceiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, Option<P2PSender>)>,
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
rendezvous_cookies: FnvHashMap<PeerId, Cookie>,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
rendezvous_cookies: FnvHashMap<PeerId, Cookie>,
node_addresses: Vec<libp2p::Multiaddr>,
announce_addresses: Vec<libp2p::Multiaddr>,
external_address_limit: u32,
}

Expand Down Expand Up @@ -117,6 +121,8 @@ where
connected_peers: FnvHashMap::default(),
rendezvous_cookies: FnvHashMap::default(),
ws_msg_sender,
node_addresses: settings.network.node_addresses.clone(),
announce_addresses: settings.network.announce_addresses.clone(),
external_address_limit: settings.network.max_announce_addresses,
}
}
Expand All @@ -135,8 +141,10 @@ where
receiver,
query_senders: FnvHashMap::default(),
connected_peers: FnvHashMap::default(),
rendezvous_cookies: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
rendezvous_cookies: FnvHashMap::default(),
node_addresses: settings.network.node_addresses.clone(),
announce_addresses: settings.network.announce_addresses.clone(),
external_address_limit: settings.network.max_announce_addresses,
}
}
Expand Down
57 changes: 49 additions & 8 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::network::IpfsCli;
use crate::{
db::{Connection, Database},
event_handler::{event::QueryRecord, Event, Handler, RequestResponseError},
libp2p::multiaddr::MultiaddrExt,
network::swarm::{CapsuleTag, ComposedEvent, RequestResponseKey, HOMESTAR_PROTOCOL_VER},
receipt::{RECEIPT_TAG, VERSION_KEY},
workflow,
Expand Down Expand Up @@ -93,7 +94,6 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
event_handler: &mut EventHandler<DB>,
) {
match event {
// N.B. Labels should be ordered with peer_id first for log testing
SwarmEvent::Behaviour(ComposedEvent::Identify(identify_event)) => {
match identify_event {
identify::Event::Error { peer_id, error } => {
Expand All @@ -105,6 +105,12 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
identify::Event::Received { peer_id, info } => {
debug!(peer_id=peer_id.to_string(), info=?info, "identify info received from peer");

// Ignore peers that do not use the Homestar protocol
if info.protocol_version != HOMESTAR_PROTOCOL_VER {
info!(protocol_version=info.protocol_version, "peer was not using our homestar protocol version: {HOMESTAR_PROTOCOL_VER}");
return;
}

let num_addresses = event_handler.swarm.external_addresses().count();

// underlying structure is a hashset, so no worry on dupes
Expand All @@ -125,17 +131,15 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(

let behavior = event_handler.swarm.behaviour_mut();

// don't bother talking with nodes that aren't running our protocol
if info.protocol_version == HOMESTAR_PROTOCOL_VER {
debug!(protocol_version=info.protocol_version, "peer was not using our homestar protocol version: {HOMESTAR_PROTOCOL_VER}");
return;
}

// kademlia
if info.protocols.contains(&kad::PROTOCOL_NAME) {
// add listen addresses to kademlia routing table
for addr in info.listen_addrs {
behavior.kademlia.add_address(&peer_id, addr);
debug!(
peer_id = peer_id.to_string(),
"added identified node to kademlia routing table"
);
}
}

Expand Down Expand Up @@ -172,7 +176,7 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
}
identify::Event::Pushed { peer_id } => debug!(
peer_id = peer_id.to_string(),
"pushed identify info too peer"
"pushed identify info to peer"
),
}
}
Expand Down Expand Up @@ -295,6 +299,7 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
}
_ => {}
},

SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed { id, result, .. },
)) => {
Expand Down Expand Up @@ -432,6 +437,15 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
_ => {}
}
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(KademliaEvent::RoutingUpdated {
peer,
..
})) => {
debug!(
peer = peer.to_string(),
"kademlia routing table updated with peer"
)
}

SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
request_response::Event::Message {
Expand Down Expand Up @@ -555,6 +569,10 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
);
if mdns.has_node(&peer_id) {
behaviour.kademlia.remove_address(&peer_id, &multiaddr);
debug!(
peer_id = peer_id.to_string(),
"removed peer address from kademlia table"
);
}
}
}
Expand All @@ -580,6 +598,29 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
"peer connection closed, cause: {cause:?}"
);
event_handler.connected_peers.remove_entry(&peer_id);

// Remove peer from DHT if not in configured peers
if event_handler.node_addresses.iter().all(|multiaddr| {
if let Some(id) = multiaddr.peer_id() {
id != peer_id
} else {
// TODO: We may want to check the multiadress without relying on
// the peer ID. This would give more flexibility when configuring nodes.
warn!("Configured peer must include a peer ID: {multiaddr}");
true
}
}) {
event_handler
.swarm
.behaviour_mut()
.kademlia
.remove_peer(&peer_id);

debug!(
peer_id = peer_id.to_string(),
"removed peer from kademlia table"
);
}
}
SwarmEvent::OutgoingConnectionError {
connection_id,
Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub mod test_utils;

pub use db::Db;
pub use event_handler::channel;
pub(crate) mod libp2p;
pub use logger::*;
pub(crate) mod metrics;
pub use receipt::{Receipt, RECEIPT_TAG, VERSION_KEY};
Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/src/libp2p/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod multiaddr;
36 changes: 36 additions & 0 deletions homestar-runtime/src/libp2p/multiaddr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};

pub(crate) trait MultiaddrExt {
fn peer_id(&self) -> Option<PeerId>;
}

impl MultiaddrExt for Multiaddr {
fn peer_id(&self) -> Option<PeerId> {
match self.iter().last() {
Some(Protocol::P2p(peer_id)) => Some(peer_id),
_ => None,
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn contains_peer_id() {
let peer_id = PeerId::random();
let multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/7001/p2p/{}", peer_id.to_base58())
.parse()
.unwrap();

assert_eq!(Multiaddr::peer_id(&multiaddr).unwrap(), peer_id)
}

#[test]
fn missing_peer_id() {
let multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/7001").parse().unwrap();

assert_eq!(Multiaddr::peer_id(&multiaddr), None)
}
}
7 changes: 7 additions & 0 deletions homestar-runtime/src/network/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use libp2p::{
gossipsub::{self, MessageId, SubscriptionError, TopicHash},
identify,
kad::{
self,
record::store::{MemoryStore, MemoryStoreConfig},
Kademlia, KademliaConfig, KademliaEvent,
},
Expand Down Expand Up @@ -131,6 +132,12 @@ pub(crate) fn init(
// Listen-on given address
swarm.listen_on(settings.listen_address.to_string().parse()?)?;

// Set Kademlia server mode
swarm
.behaviour_mut()
.kademlia
.set_mode(Some(kad::Mode::Server));

// add external addresses from settings
if !settings.announce_addresses.is_empty() {
for addr in settings.announce_addresses.iter() {
Expand Down
Loading

0 comments on commit 91aae7f

Please sign in to comment.