Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Improve rendezvous protocol usage #399

Merged
merged 32 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7250f0a
chore: Add rendezvous config and PEM fixtures
bgins Oct 19, 2023
2e270e3
chore: Add peer registered debug log
bgins Oct 24, 2023
7fade98
test: Add rendezvous connect test
bgins Oct 24, 2023
e6d9e5e
chore: Add rendezvous server config with false default
bgins Oct 24, 2023
5145d58
chore: Add max connected peers config
bgins Oct 24, 2023
d328de7
chore: Add rendezvous server registration expired debug log
bgins Oct 24, 2023
7ea6ff5
test: Add rendezvous disconnect test
bgins Oct 24, 2023
1acc08d
chore: Check identified peer before adding to external addresses
bgins Oct 25, 2023
e79ee5f
chore: Add check to not dial self on rendezvous discovery
bgins Oct 25, 2023
24a1512
refactor: Improve dialing after rendezvous discovery
bgins Oct 25, 2023
4e9135d
chore: Add rendezvous ttl config
bgins Oct 25, 2023
0bab542
chore: Improve logs
bgins Oct 25, 2023
2fc7ee8
chore: Add expired rendezvous peer re-discovery
bgins Oct 26, 2023
6dfed94
chore: Add rendezvous discovery interval config
bgins Oct 27, 2023
15f4d32
feat: Add rendezvous rediscovery and registration renewal
bgins Oct 27, 2023
4e0f08d
chore: Add swarm dialing event debug log
bgins Oct 29, 2023
5a5266c
chore: Require expiration cause for redisovery and registration renewal
bgins Oct 29, 2023
5b37c6c
chore: Add extract timestamps test utility
bgins Oct 29, 2023
89e114f
chore: Remove cache Expiration enum
bgins Oct 30, 2023
9f0b227
chore: Add cache polling loop
bgins Oct 30, 2023
4d7ad72
test: Add rendezvous registration renewal test
bgins Oct 30, 2023
e709f7c
test: Add rendezvous rediscovery test
bgins Oct 31, 2023
0c5308b
test: Add rediscover on expiration test
bgins Oct 31, 2023
189548e
chore: Update fixture ports and add peer ID comments
bgins Oct 31, 2023
fd04b65
chore: Add poll cache to event handler without ipfs feature
bgins Oct 31, 2023
b5f31df
chore: Rename our_registration to self_registration
bgins Oct 31, 2023
402a412
chore: Remove unneeded sender clone
bgins Oct 31, 2023
e02ce80
chore: Move event handler rendezvous fields into a substruct
bgins Oct 31, 2023
c4bbd5f
chore: Move event handler connection fields into a substruct
bgins Oct 31, 2023
0cec3b1
chore: Add poll cache interval config
bgins Oct 31, 2023
263a871
chore: Upgrade to libp2p 0.52.4
bgins Oct 31, 2023
0ea35d0
chore: updates
zeeshanlakhani Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions homestar-runtime/fixtures/__testkey_ed25519_2.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-----BEGIN PRIVATE KEY-----
AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE=
-----END PRIVATE KEY-----
3 changes: 3 additions & 0 deletions homestar-runtime/fixtures/__testkey_ed25519_3.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-----BEGIN PRIVATE KEY-----
AgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgI=
-----END PRIVATE KEY-----
4 changes: 4 additions & 0 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub(crate) struct EventHandler<DB: Database> {
receiver: channel::AsyncBoundedChannelReceiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, Option<P2PSender>)>,
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
connected_peers_limit: u32,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
rendezvous_cookies: FnvHashMap<PeerId, Cookie>,
pubsub_enabled: bool,
Expand All @@ -80,6 +81,7 @@ pub(crate) struct EventHandler<DB: Database> {
receiver: channel::AsyncBoundedChannelReceiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, Option<P2PSender>)>,
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
connected_peers_limit: u32,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
rendezvous_cookies: FnvHashMap<PeerId, Cookie>,
pubsub_enabled: bool,
Expand Down Expand Up @@ -121,6 +123,7 @@ where
query_senders: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
connected_peers: FnvHashMap::default(),
connected_peers_limit: settings.network.max_connected_peers,
rendezvous_cookies: FnvHashMap::default(),
pubsub_enabled: settings.network.enable_pubsub,
ws_msg_sender,
Expand All @@ -144,6 +147,7 @@ where
receiver,
query_senders: FnvHashMap::default(),
connected_peers: FnvHashMap::default(),
connected_peers_limit: settings.network.max_connected_peers,
request_response_senders: FnvHashMap::default(),
rendezvous_cookies: FnvHashMap::default(),
pubsub_enabled: settings.network.enable_pubsub,
Expand Down
53 changes: 43 additions & 10 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
if num_addresses < event_handler.external_address_limit as usize {
info.observed_addr
.iter()
// if _any_ part of the multiaddr includes a private IP, dont add it to our external address list
// if _any_ part of the multiaddr includes a private IP, don't add it to our external address list
.filter_map(|proto| match proto {
Protocol::Ip4(ip) => Some(ip),
_ => None,
Expand Down Expand Up @@ -164,6 +164,7 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
"failed to register with rendezvous peer"
)
}

// discover other nodes
rendezvous_client.discover(
Some(Namespace::from_static(RENDEZVOUS_NAMESPACE)),
Expand Down Expand Up @@ -194,15 +195,25 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
.insert(rendezvous_node, cookie);

// dial discovered peers
for registration in registrations {
for (index, registration) in registrations.iter().enumerate() {
// TODO: do anything with ttl here?
let opts = DialOpts::peer_id(registration.record.peer_id())
.addresses(registration.record.addresses().to_vec())
.condition(libp2p::swarm::dial_opts::PeerCondition::Disconnected)
.build();
// TODO: we might be dialing too many peers here. Add settings to configure when we stop dialing new peers
if let Err(err) = event_handler.swarm.dial(opts) {
warn!(peer_id=registration.record.peer_id().to_string(), err=?err, "failed to dial peer discovered through rendezvous")

// Dial discovered peer if not at connected peers limit
if event_handler.connected_peers.len() + index
zeeshanlakhani marked this conversation as resolved.
Show resolved Hide resolved
< event_handler.connected_peers_limit as usize
{
if let Err(err) = event_handler.swarm.dial(opts) {
warn!(peer_id=registration.record.peer_id().to_string(), err=?err, "failed to dial peer discovered through rendezvous")
}
} else {
warn!(
peer_id=registration.record.peer_id().to_string(),
"peer discovered through rendezvous not dialed because max connected peers limit reached"
)
}
}
} else {
Expand Down Expand Up @@ -261,13 +272,25 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
rendezvous::server::Event::DiscoverNotServed { enquirer, error } => {
warn!(peer_id=enquirer.to_string(), err=?error, "did not serve rendezvous discover request")
}
rendezvous::server::Event::PeerRegistered { peer, .. } => {
debug!(
peer_id = peer.to_string(),
"registered peer through rendezvous"
)
}
rendezvous::server::Event::PeerNotRegistered {
peer,
namespace,
error,
} => {
warn!(peer_id=peer.to_string(), err=?error, namespace=?namespace, "did not register peer with rendezvous")
}
rendezvous::server::Event::RegistrationExpired(registration) => {
debug!(
peer_id = registration.record.peer_id().to_string(),
"rendezvous peer registration expired on server"
)
}
_ => (),
}
}
Expand Down Expand Up @@ -551,11 +574,21 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
addr = multiaddr.to_string(),
"mDNS discovered a new peer"
);
let _ = event_handler.swarm.dial(
DialOpts::peer_id(peer_id)
.addresses(vec![multiaddr])
.build(),
);

if event_handler.connected_peers.len()
< event_handler.connected_peers_limit as usize
{
let _ = event_handler.swarm.dial(
DialOpts::peer_id(peer_id)
.addresses(vec![multiaddr])
.build(),
);
} else {
warn!(
peer_id = peer_id.to_string(),
"peer discovered by mDNS not dialed because max connected peers limit reached"
)
}
}
}
SwarmEvent::Behaviour(ComposedEvent::Mdns(mdns::Event::Expired(list))) => {
Expand Down
40 changes: 22 additions & 18 deletions homestar-runtime/src/network/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ pub(crate) async fn new(settings: &settings::Node) -> Result<Swarm<ComposedBehav
} else {
None
}),
rendezvous_client: Toggle::from(if settings.network.enable_rendezvous {
rendezvous_client: Toggle::from(if settings.network.enable_rendezvous_client {
Some(rendezvous::client::Behaviour::new(keypair.clone()))
} else {
None
}),
rendezvous_server: Toggle::from(if settings.network.enable_rendezvous {
rendezvous_server: Toggle::from(if settings.network.enable_rendezvous_server {
Some(rendezvous::server::Behaviour::new(
rendezvous::server::Config::default(),
))
Expand Down Expand Up @@ -158,23 +158,27 @@ pub(crate) fn init(
}

// Dial nodes specified in settings. Failure here shouldn't halt node startup.
for addr in &settings.node_addresses {
let _ = swarm
.dial(addr.clone())
// log dial failure and continue
.map_err(|e| warn!(err=?e, "failed to dial configured node"));

// add node to kademlia routing table
if let Some(Protocol::P2p(peer_id)) =
addr.iter().find(|proto| matches!(proto, Protocol::P2p(_)))
{
info!(addr=?addr, "added configured node to kademlia routing table");
swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr.clone());
for (index, addr) in settings.node_addresses.iter().enumerate() {
if index < settings.max_connected_peers as usize {
let _ = swarm
.dial(addr.clone())
// log dial failure and continue
.map_err(|e| warn!(err=?e, "failed to dial configured node"));

// add node to kademlia routing table
if let Some(Protocol::P2p(peer_id)) =
addr.iter().find(|proto| matches!(proto, Protocol::P2p(_)))
{
info!(addr=?addr, "added configured node to kademlia routing table");
swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr.clone());
} else {
warn!(addr=?addr, err="configured node address did not include a peer ID", "node not added to kademlia routing table")
}
} else {
warn!(addr=?addr, err="configured node address did not include a peer ID", "node not added to kademlia routing table")
warn!(addr=?addr, "address not dialed because node addresses count exceeds max connected peers configuration")
}
}

Expand Down
14 changes: 10 additions & 4 deletions homestar-runtime/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ pub struct Network {
/// [Swarm]: libp2p::swarm::Swarm
#[serde(with = "http_serde::uri")]
pub(crate) listen_address: Uri,
/// Enable Rendezvous protocol.
pub(crate) enable_rendezvous: bool,
/// Enable Rendezvous protocol client.
pub(crate) enable_rendezvous_client: bool,
/// Enable Rendezvous protocol server.
pub(crate) enable_rendezvous_server: bool,
/// Enable mDNS.
pub(crate) enable_mdns: bool,
/// mDNS IPv6 enable flag
Expand Down Expand Up @@ -141,7 +143,9 @@ pub struct Network {
/// network.
#[serde_as(as = "Vec<serde_with::DisplayFromStr>")]
pub(crate) announce_addresses: Vec<libp2p::Multiaddr>,
/// Limit on the number of external addresses we annoucne to other peers.
/// Maximum number of peers we will dial.
pub(crate) max_connected_peers: u32,
/// Limit on the number of external addresses we announce to other peers.
pub(crate) max_announce_addresses: u32,
}

Expand Down Expand Up @@ -186,8 +190,9 @@ impl Default for Network {
Self {
events_buffer_len: 1024,
listen_address: Uri::from_static("/ip4/0.0.0.0/tcp/0"),
enable_rendezvous_client: true,
enable_rendezvous_server: false,
// TODO: we would like to enable this by default, however this breaks mdns on at least some linux distros. Requires further investigation.
enable_rendezvous: true,
enable_mdns: true,
mdns_enable_ipv6: false,
mdns_query_interval: Duration::from_secs(5 * 60),
Expand All @@ -212,6 +217,7 @@ impl Default for Network {
node_addresses: Vec::new(),
announce_addresses: Vec::new(),
max_announce_addresses: 10,
max_connected_peers: 32,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion homestar-runtime/tests/fixtures/test_mdns1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ console_subscriber_port = 5560
rpc_port = 9800
websocket_port = 8000
listen_address = "/ip4/0.0.0.0/tcp/0"
enable_rendezvous = false
enable_rendezvous_client = false

[node.network.keypair_config]
existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }
2 changes: 1 addition & 1 deletion homestar-runtime/tests/fixtures/test_mdns2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ console_subscriber_port = 5561
rpc_port = 9801
websocket_port = 8001
listen_address = "/ip4/0.0.0.0/tcp/0"
enable_rendezvous = false
enable_rendezvous_client = false

[node.network.keypair_config]
existing = { key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" }
2 changes: 1 addition & 1 deletion homestar-runtime/tests/fixtures/test_network1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ node_addresses = [
"/ip4/127.0.0.1/tcp/7001/p2p/16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc",
]
enable_mdns = false
enable_rendezvous = false
enable_rendezvous_client = false

[node.network.keypair_config]
existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }
2 changes: 1 addition & 1 deletion homestar-runtime/tests/fixtures/test_network2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ node_addresses = [
"/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN",
]
enable_mdns = false
enable_rendezvous = false
enable_rendezvous_client = false

[node.network.keypair_config]
existing = { key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" }
16 changes: 16 additions & 0 deletions homestar-runtime/tests/fixtures/test_rendezvous1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[monitoring]
process_collector_interval = 500
metrics_port = 4035
console_subscriber_port = 5585

[node]

[node.network]
rpc_port = 9825
websocket_port = 8025
listen_address = "/ip4/127.0.0.1/tcp/7000"
enable_rendezvous_server = true
enable_mdns = false

[node.network.keypair_config]
existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }
21 changes: 21 additions & 0 deletions homestar-runtime/tests/fixtures/test_rendezvous2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[monitoring]
process_collector_interval = 500
metrics_port = 4036
console_subscriber_port = 5586

[node]

[node.network]
rpc_port = 9826
websocket_port = 8026
listen_address = "/ip4/127.0.0.1/tcp/7001"
announce_addresses = [
"/ip4/127.0.0.1/tcp/7001/p2p/16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc",
]
node_addresses = [
"/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN",
]
enable_mdns = false

[node.network.keypair_config]
existing = { key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" }
18 changes: 18 additions & 0 deletions homestar-runtime/tests/fixtures/test_rendezvous3.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[monitoring]
process_collector_interval = 500
metrics_port = 4037
console_subscriber_port = 5587

[node]

[node.network]
rpc_port = 9827
websocket_port = 8027
listen_address = "/ip4/127.0.0.1/tcp/7002"
node_addresses = [
"/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN",
]
enable_mdns = false

[node.network.keypair_config]
existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519_2.pem" }
Loading
Loading