From 0c6a5676b45a1af8aee55b9c733ec0ffe38d9ee7 Mon Sep 17 00:00:00 2001 From: Brian Ginsburg Date: Fri, 27 Oct 2023 16:04:01 -0700 Subject: [PATCH] feat: Add rendezvous rediscovery and registration renewal Add cache with eviction listener to trigger discovery and renewal events on an interval and at TTL respectively. --- .envrc | 2 +- Cargo.lock | 100 ++++++++++++++++++ homestar-runtime/Cargo.toml | 4 + homestar-runtime/src/event_handler.rs | 16 ++- homestar-runtime/src/event_handler/cache.rs | 91 ++++++++++++++++ homestar-runtime/src/event_handler/event.rs | 45 ++++++++ .../src/event_handler/swarm_event.rs | 82 +++++++++++--- homestar-runtime/src/logger.rs | 1 + 8 files changed, 323 insertions(+), 18 deletions(-) create mode 100644 homestar-runtime/src/event_handler/cache.rs diff --git a/.envrc b/.envrc index a874fac6e..8ea2cc72d 100644 --- a/.envrc +++ b/.envrc @@ -1,5 +1,5 @@ use_flake -export RUST_LOG=homestar=debug,homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug +export RUST_LOG=homestar=debug,homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug,moka=debug export RUST_BACKTRACE=full export RUSTFLAGS="--cfg tokio_unstable" diff --git a/Cargo.lock b/Cargo.lock index 91482086b..11cac04fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -633,6 +633,15 @@ dependencies = [ "serde", ] +[[package]] +name = "camino" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + [[package]] name = "cap-fs-ext" version = "1.0.15" @@ -696,6 +705,28 @@ dependencies = [ "winx 0.35.1", ] +[[package]] +name = "cargo-platform" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12024c4645c97566567129c204f65d5815a8c9aecf30fcbe682b2fe034996d36" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -1776,6 +1807,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -2161,6 +2201,12 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.21" @@ -2394,6 +2440,7 @@ dependencies = [ "metrics", "metrics-exporter-prometheus", "miette", + "moka", "names", "nix 0.27.1", "once_cell", @@ -3806,6 +3853,29 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "moka" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8017ec3548ffe7d4cef7ac0e12b044c01164a74c0f3119420faeaf13490ad8b" +dependencies = [ + "async-lock", + "async-trait", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "once_cell", + "parking_lot", + "rustc_version", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multiaddr" version = "0.17.1" @@ -5276,6 +5346,9 @@ name = "semver" version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -5496,6 +5569,21 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark 0.9.3", + "tempfile", + "walkdir", +] + [[package]] name = "sketches-ddsketch" version = "0.2.1" @@ -5869,6 +5957,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "target-lexicon" version = "0.12.11" @@ -6315,6 +6409,12 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" + [[package]] name = "trust-dns-proto" version = "0.22.0" diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index b73dbcabd..6b5d93485 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -111,6 +111,10 @@ metrics-exporter-prometheus = { version = "0.12.1", default-features = false, fe "http-listener", ], optional = true } miette = { version = "5.10", default-features = false, features = ["fancy"] } +moka = { version = "0.12.1", default-features = false, features = [ + "future", + "sync", +] } names = { version = "0.14", default-features = false, optional = true } proptest = { version = "1.2", optional = true } puffin = { version = "0.17", default-features = false, optional = true } diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 6a639330a..5351cf0aa 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -16,14 +16,17 @@ use libp2p::{ core::ConnectedPoint, futures::StreamExt, kad::QueryId, rendezvous::Cookie, request_response::RequestId, swarm::Swarm, PeerId, }; +use moka::future::Cache; use std::{sync::Arc, time::Duration}; use swarm_event::ResponseEvent; use tokio::select; +pub(crate) mod cache; pub mod channel; pub(crate) mod error; pub(crate) mod event; pub(crate) mod swarm_event; +pub(crate) use cache::{setup_cache, CacheValue}; pub(crate) use error::RequestResponseError; pub(crate) use event::Event; @@ -54,6 +57,7 @@ pub(crate) struct EventHandler { p2p_provider_timeout: Duration, db: DB, swarm: Swarm, + cache: Cache, sender: Arc>, receiver: channel::AsyncBoundedChannelReceiver, query_senders: FnvHashMap)>, @@ -80,6 +84,7 @@ pub(crate) struct EventHandler { p2p_provider_timeout: Duration, db: DB, swarm: Swarm, + cache: Cache, sender: Arc>, receiver: channel::AsyncBoundedChannelReceiver, query_senders: FnvHashMap)>, @@ -118,13 +123,15 @@ where ws_msg_sender: ws::Notifier, ) -> Self { let (sender, receiver) = Self::setup_channel(settings); + let sender = Arc::new(sender); Self { receipt_quorum: settings.network.receipt_quorum, workflow_quorum: settings.network.workflow_quorum, p2p_provider_timeout: settings.network.p2p_provider_timeout, db, swarm, - sender: Arc::new(sender), + cache: setup_cache(sender.clone()), + sender: sender.clone(), receiver, query_senders: FnvHashMap::default(), request_response_senders: FnvHashMap::default(), @@ -146,13 +153,15 @@ where #[cfg(not(feature = "websocket-server"))] pub(crate) fn new(swarm: Swarm, db: DB, settings: &settings::Node) -> Self { let (sender, receiver) = Self::setup_channel(settings); + let sender = Arc::new(sender); Self { receipt_quorum: settings.network.receipt_quorum, workflow_quorum: settings.network.workflow_quorum, p2p_provider_timeout: settings.network.p2p_provider_timeout, db, swarm, - sender: Arc::new(sender), + cache: setup_cache(sender.clone()), + sender: sender.clone(), receiver, query_senders: FnvHashMap::default(), connected_peers: FnvHashMap::default(), @@ -226,6 +235,9 @@ where swarm_event.handle_event(&mut self, ipfs_clone).await; } } + + // Poll cache for expired entries + self.cache.run_pending_tasks().await; } } } diff --git a/homestar-runtime/src/event_handler/cache.rs b/homestar-runtime/src/event_handler/cache.rs new file mode 100644 index 000000000..a42fdac5e --- /dev/null +++ b/homestar-runtime/src/event_handler/cache.rs @@ -0,0 +1,91 @@ +use crate::{channel, event_handler::Event}; +use libp2p::PeerId; +use moka::{future::Cache, Expiry as ExpiryBase}; +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; + +struct Expiry; + +impl ExpiryBase for Expiry { + fn expire_after_create( + &self, + _key: &String, + value: &CacheValue, + _current_time: Instant, + ) -> Option { + value.expiration.as_duration() + } +} + +#[derive(Clone, Debug)] +pub(crate) struct CacheValue { + expiration: Expiration, + data: HashMap, +} + +impl CacheValue { + pub(crate) fn new(expiration: Expiration, data: HashMap) -> Self { + Self { expiration, data } + } +} + +#[derive(Clone, Debug)] +pub(crate) enum CacheData { + Peer(PeerId), + OnEviction(DispatchEvent), +} + +#[derive(Clone, Debug)] +pub(crate) enum DispatchEvent { + RegisterPeer, + DiscoverPeers, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) enum Expiration { + Registration(Duration), + Discovery(Duration), +} + +impl Expiration { + pub(crate) fn as_duration(&self) -> Option { + match self { + Expiration::Registration(ttl) => Some(*ttl), + Expiration::Discovery(interval) => Some(*interval), + } + } +} + +pub(crate) fn setup_cache( + sender: Arc>, +) -> Cache { + let eviction_listener = move |_key: Arc, val: CacheValue, _cause| { + let tx = Arc::clone(&sender); + + if let Some(CacheData::OnEviction(event)) = val.data.get("on_eviction") { + match event { + DispatchEvent::RegisterPeer => { + if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node") + { + let _ = tx.send(Event::RegisterPeer(rendezvous_node.to_owned())); + }; + } + DispatchEvent::DiscoverPeers => { + if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node") + { + let _ = tx.send(Event::DiscoverPeers(rendezvous_node.to_owned())); + }; + } + } + } + }; + + Cache::builder() + .expire_after(Expiry) + .time_to_live(Duration::from_secs(5)) + .eviction_listener(eviction_listener) + .build() +} diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index e20444802..97fde1b0e 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -22,6 +22,7 @@ use homestar_core::{ipld::DagJson, workflow::Receipt as InvocationReceipt}; use libipld::{Cid, Ipld}; use libp2p::{ kad::{record::Key, Quorum, Record}, + rendezvous::Namespace, PeerId, }; use std::{collections::HashSet, num::NonZeroUsize, sync::Arc}; @@ -106,8 +107,14 @@ pub(crate) enum Event { ProvideRecord(Cid, Option, CapsuleTag), /// Found Providers/[PeerId]s on the DHT. Providers(Result<(HashSet, RequestResponseKey, P2PSender)>), + /// Register with a rendezvous node. + RegisterPeer(PeerId), + /// Discover peers from a rendezvous node. + DiscoverPeers(PeerId), } +const RENDEZVOUS_NAMESPACE: &str = "homestar"; + impl Event { async fn handle_info(self, event_handler: &mut EventHandler) -> Result<()> where @@ -164,6 +171,44 @@ impl Event { Event::Providers(Err(err)) => { error!("failed to find providers: {}", err); } + Event::RegisterPeer(peer_id) => { + if let Some(rendezvous_client) = event_handler + .swarm + .behaviour_mut() + .rendezvous_client + .as_mut() + { + // register self with remote + if let Err(err) = rendezvous_client.register( + Namespace::from_static(RENDEZVOUS_NAMESPACE), + peer_id, + Some(event_handler.rendezvous_registration_ttl.as_secs()), + ) { + warn!( + peer_id = peer_id.to_string(), + err = format!("{err}"), + "failed to register with rendezvous peer" + ) + } + } + } + Event::DiscoverPeers(peer_id) => { + if let Some(rendezvous_client) = event_handler + .swarm + .behaviour_mut() + .rendezvous_client + .as_mut() + { + let cookie = event_handler.rendezvous_cookies.get(&peer_id).cloned(); + + rendezvous_client.discover( + Some(Namespace::from_static(RENDEZVOUS_NAMESPACE)), + cookie, + None, + peer_id, + ); + } + } _ => {} } Ok(()) diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 1fa19dc3a..a74768f28 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -5,7 +5,11 @@ use super::EventHandler; use crate::network::IpfsCli; use crate::{ db::{Connection, Database}, - event_handler::{event::QueryRecord, Event, Handler, RequestResponseError}, + event_handler::{ + cache::{self, CacheData, CacheValue, Expiration}, + event::QueryRecord, + Event, Handler, RequestResponseError, + }, libp2p::multiaddr::MultiaddrExt, network::swarm::{ CapsuleTag, ComposedEvent, PeerDiscoveryInfo, RequestResponseKey, HOMESTAR_PROTOCOL_VER, @@ -35,7 +39,10 @@ use libp2p::{ swarm::{dial_opts::DialOpts, SwarmEvent}, PeerId, StreamProtocol, }; -use std::{collections::HashSet, fmt}; +use std::{ + collections::{HashMap, HashSet}, + fmt, +}; use tracing::{debug, error, info, warn}; const RENDEZVOUS_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/rendezvous/1.0.0"); @@ -121,23 +128,22 @@ async fn handle_swarm_event( { info.observed_addr .iter() - // if _any_ part of the multiaddr includes a private IP, don't add it to our external address list + // If any part of the multiaddr includes a private IP, don't add it to our external address list .filter_map(|proto| match proto { Protocol::Ip4(ip) => Some(ip), _ => None, }) .all(|proto| !proto.is_private()) - // identify observed a potentially valid external address that we weren't aware of. - // add it to the addresses we announce to other peers + // Identify observed a potentially valid external address that we weren't aware of. + // Add it to the addresses we announce to other peers. // TODO: have a set of _maybe_ external addresses that we validate with other peers first before adding it .then(|| event_handler.swarm.add_external_address(info.observed_addr)); } let behavior = event_handler.swarm.behaviour_mut(); - // kademlia + // Add listen addresses to kademlia routing table if info.protocols.contains(&kad::PROTOCOL_NAME) { - // add listen addresses to kademlia routing table for addr in info.listen_addrs { behavior.kademlia.add_address(&peer_id, addr); debug!( @@ -147,8 +153,7 @@ async fn handle_swarm_event( } } - // rendezvous - // we are good to register self & discover with any node we contact. more peers = more better! + // Register and discover with nodes running the rendezvous protocol if info.protocols.contains(&RENDEZVOUS_PROTOCOL_NAME) { if let Some(rendezvous_client) = event_handler .swarm @@ -169,7 +174,7 @@ async fn handle_swarm_event( ) } - // discover other nodes + // Discover other nodes rendezvous_client.discover( Some(Namespace::from_static(RENDEZVOUS_NAMESPACE)), None, @@ -257,6 +262,31 @@ async fn handle_swarm_event( ) } } + + // Discover peers again at discovery interval + event_handler + .cache + .insert( + rendezvous_node.to_string(), + CacheValue::new( + Expiration::Discovery( + event_handler.rendezvous_discovery_interval, + ), + HashMap::from([ + ( + "on_eviction".to_string(), + CacheData::OnEviction( + cache::DispatchEvent::DiscoverPeers, + ), + ), + ( + "rendezvous_node".to_string(), + CacheData::Peer(rendezvous_node), + ), + ]), + ), + ) + .await; } else { // Do not dial peers that are not using our namespace warn!(peer_id=rendezvous_node.to_string(), namespace=?cookie.namespace(), "rendezvous peer gave records from an unexpected namespace"); @@ -273,11 +303,33 @@ async fn handle_swarm_event( rendezvous_node, ttl, .. - } => debug!( - peer_id = rendezvous_node.to_string(), - ttl = ttl, - "registered self with rendezvous peer" - ), + } => { + debug!( + peer_id = rendezvous_node.to_string(), + ttl = ttl, + "registered self with rendezvous node" + ); + + event_handler + .cache + .insert( + rendezvous_node.to_string(), + CacheValue::new( + Expiration::Registration(event_handler.rendezvous_registration_ttl), + HashMap::from([ + ( + "on_eviction".to_string(), + CacheData::OnEviction(cache::DispatchEvent::RegisterPeer), + ), + ( + "rendezvous_node".to_string(), + CacheData::Peer(rendezvous_node), + ), + ]), + ), + ) + .await; + } rendezvous::client::Event::RegisterFailed { rendezvous_node, error, diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 4e5dd8f3d..4deb4d872 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -58,6 +58,7 @@ fn init( ) .add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT)) .add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT)) }); #[cfg(all(