From 5b5b2ffa840a42a27d65053fdd742fe6688409f3 Mon Sep 17 00:00:00 2001 From: Brian Ginsburg <7957636+bgins@users.noreply.github.com> Date: Mon, 6 Nov 2023 18:17:29 -0800 Subject: [PATCH] refactor: Improve rendezvous protocol usage (#399) # Description This pull request implements the following features: - [x] Add event handler cache implementation - [x] Renew rendezvous registration at expiration - [x] Re-discover from rendezvous servers on a set interval - [x] Renew discovered peer registrations when they expire - [x] Remove behavior where we add other peers to `external_addresses` - [x] Add check to not dial ourselves on rendezvous discovery - [x] Update dialing on rendezvous discovery to only dial peers we aren't already connected to - [x] Add rendezvous server `Dialing`, `PeerRegistered`, and `RegistrationExpired` event debug logs - [x] Separate `enable_rendezvous` config into `enable_rendezvous_server` and `enable_rendezvous_client` configs (server is opt-in, client is opt-out) - [x] Add `max_connected_peers` config - [x] Add `rendezvous_registration_ttl` and `rendezvous_discovery_interval` configs - [x] Test rendezvous register, discover, and connect - [x] Test disconnect after rendezvous connect - [x] Test registration expires and re-registration on expiration - [x] Test discovered registration expires and rediscovery attempted at expiration - [x] Test rediscovery on rendezvous discovery interval - [x] Add `extract_timestamps_where` and `count_lines_where` test utilities - [x] Update libp2p deprecated Kademlia and SwarmBuilder interfaces - [x] Update integration test fixtures to avoid overlapping ports ## Link to issue Implements #131. ## Type of change - [x] New feature (non-breaking change that adds functionality) - [x] Refactor (non-breaking change that updates existing functionality) - [x] Comments have been added/updated ## Test plan (required) We are adding tests to check existing functionality and a test to check registration renewals. --------- Co-authored-by: Zeeshan Lakhani --- .envrc | 2 +- Cargo.lock | 515 ++++++++++++------ flake.lock | 12 +- flake.nix | 1 - homestar-core/src/unit.rs | 2 +- .../src/workflow/instruction_result.rs | 6 +- homestar-runtime/Cargo.toml | 4 + .../fixtures/__testkey_ed25519_2.pem | 3 + .../fixtures/__testkey_ed25519_3.pem | 3 + .../fixtures/__testkey_ed25519_4.pem | 3 + .../fixtures/__testkey_ed25519_5.pem | 3 + homestar-runtime/src/event_handler.rs | 84 ++- homestar-runtime/src/event_handler/cache.rs | 83 +++ homestar-runtime/src/event_handler/event.rs | 45 ++ .../src/event_handler/swarm_event.rs | 247 +++++++-- homestar-runtime/src/logger.rs | 1 + homestar-runtime/src/network/swarm.rs | 88 +-- homestar-runtime/src/settings.rs | 30 +- homestar-runtime/tests/cli.rs | 18 +- .../tests/fixtures/test_mdns1.toml | 3 +- .../tests/fixtures/test_mdns2.toml | 3 +- .../tests/fixtures/test_metrics.toml | 1 + .../tests/fixtures/test_network1.toml | 3 +- .../tests/fixtures/test_network2.toml | 3 +- .../tests/fixtures/test_rendezvous1.toml | 17 + .../tests/fixtures/test_rendezvous2.toml | 22 + .../tests/fixtures/test_rendezvous3.toml | 19 + .../tests/fixtures/test_rendezvous4.toml | 23 + .../tests/fixtures/test_rendezvous5.toml | 20 + .../tests/fixtures/test_rendezvous6.toml | 23 + homestar-runtime/tests/fixtures/test_v4.toml | 6 +- .../tests/fixtures/test_v4_alt.toml | 6 +- homestar-runtime/tests/fixtures/test_v6.toml | 6 +- homestar-runtime/tests/network.rs | 435 ++++++++++++++- homestar-runtime/tests/utils.rs | 60 +- 35 files changed, 1483 insertions(+), 317 deletions(-) create mode 100644 homestar-runtime/fixtures/__testkey_ed25519_2.pem create mode 100644 homestar-runtime/fixtures/__testkey_ed25519_3.pem create mode 100644 homestar-runtime/fixtures/__testkey_ed25519_4.pem create mode 100644 homestar-runtime/fixtures/__testkey_ed25519_5.pem create mode 100644 homestar-runtime/src/event_handler/cache.rs create mode 100644 homestar-runtime/tests/fixtures/test_rendezvous1.toml create mode 100644 homestar-runtime/tests/fixtures/test_rendezvous2.toml create mode 100644 homestar-runtime/tests/fixtures/test_rendezvous3.toml create mode 100644 homestar-runtime/tests/fixtures/test_rendezvous4.toml create mode 100644 homestar-runtime/tests/fixtures/test_rendezvous5.toml create mode 100644 homestar-runtime/tests/fixtures/test_rendezvous6.toml diff --git a/.envrc b/.envrc index a874fac6..8ea2cc72 100644 --- a/.envrc +++ b/.envrc @@ -1,5 +1,5 @@ use_flake -export RUST_LOG=homestar=debug,homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug +export RUST_LOG=homestar=debug,homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug,moka=debug export RUST_BACKTRACE=full export RUSTFLAGS="--cfg tokio_unstable" diff --git a/Cargo.lock b/Cargo.lock index d23633f2..51eecfda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "ambient-authority" version = "0.0.2" @@ -357,6 +363,17 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76f2bfe491d41d45507b8431da8274f7feeca64a49e86d980eed2937ec2ff020" +[[package]] +name = "attohttpc" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" +dependencies = [ + "http", + "log", + "url", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -620,9 +637,9 @@ checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" @@ -633,6 +650,15 @@ dependencies = [ "serde", ] +[[package]] +name = "camino" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + [[package]] name = "cap-fs-ext" version = "1.0.15" @@ -696,6 +722,28 @@ dependencies = [ "winx 0.35.1", ] +[[package]] +name = "cargo-platform" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12024c4645c97566567129c204f65d5815a8c9aecf30fcbe682b2fe034996d36" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -1289,22 +1337,9 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "3.2.0" +version = "4.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61" -dependencies = [ - "byteorder", - "digest 0.9.0", - "rand_core 0.5.1", - "subtle", - "zeroize", -] - -[[package]] -name = "curve25519-dalek" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622178105f911d937a42cdb140730ba4a3ed2becd8ae6ce39c7d28b5d75d4588" +checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" dependencies = [ "cfg-if", "cpufeatures", @@ -1635,11 +1670,11 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7277392b266383ef8396db7fdeb1e77b6c52fed775f5df15bb24f35b72156980" dependencies = [ - "curve25519-dalek 4.1.0", + "curve25519-dalek", "ed25519", - "rand_core 0.6.4", + "rand_core", "serde", - "sha2 0.10.7", + "sha2 0.10.8", "zeroize", ] @@ -1776,6 +1811,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -1946,6 +1990,16 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-bounded" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b07bbbe7d7e78809544c6f718d875627addc73a7c3582447abc052cd3dc67e0" +dependencies = [ + "futures-timer", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -2105,17 +2159,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "getrandom" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - [[package]] name = "getrandom" version = "0.2.10" @@ -2125,7 +2168,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "wasm-bindgen", ] @@ -2161,6 +2204,12 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.21" @@ -2217,6 +2266,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" dependencies = [ "ahash", + "allocator-api2", ] [[package]] @@ -2283,6 +2333,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" +[[package]] +name = "hkdf" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +dependencies = [ + "hmac 0.12.1", +] + [[package]] name = "hmac" version = "0.8.1" @@ -2293,6 +2352,15 @@ dependencies = [ "digest 0.9.0", ] +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.7", +] + [[package]] name = "hmac-drbg" version = "0.3.0" @@ -2301,7 +2369,7 @@ checksum = "17ea0a1394df5b6574da6e0c1ade9e78868c9fb0a4e5ef4428e32da4676b85b1" dependencies = [ "digest 0.9.0", "generic-array 0.14.7", - "hmac", + "hmac 0.8.1", ] [[package]] @@ -2394,6 +2462,7 @@ dependencies = [ "metrics", "metrics-exporter-prometheus", "miette", + "moka", "names", "nix 0.27.1", "once_cell", @@ -2650,9 +2719,9 @@ dependencies = [ [[package]] name = "if-watch" -version = "3.0.1" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9465340214b296cd17a0009acdb890d6160010b8adf8f78a00d0d7ab270f79f" +checksum = "bbb892e5777fe09e16f3d44de7802f4daa7267ecbe8c466f19d94e25bb0c303e" dependencies = [ "async-io", "core-foundation", @@ -2664,7 +2733,26 @@ dependencies = [ "rtnetlink", "system-configuration", "tokio", - "windows 0.34.0", + "windows 0.51.1", +] + +[[package]] +name = "igd-next" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57e065e90a518ab5fedf79aa1e4b784e10f8e484a834f6bda85c42633a2cb7af" +dependencies = [ + "async-trait", + "attohttpc", + "bytes", + "futures", + "http", + "hyper", + "log", + "rand", + "tokio", + "url", + "xmltree", ] [[package]] @@ -2934,9 +3022,9 @@ checksum = "03087c2bad5e1034e8cace5926dec053fb3790248370865f5117a7d0213354c8" [[package]] name = "libc" -version = "0.2.148" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libipld" @@ -3036,14 +3124,15 @@ checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" [[package]] name = "libp2p" -version = "0.52.3" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d07d1502a027366d55afe187621c2d7895dc111a3df13b35fed698049681d7" +checksum = "e94495eb319a85b70a68b85e2389a95bb3555c71c49025b78c691a854a7e6464" dependencies = [ "bytes", + "either", "futures", "futures-timer", - "getrandom 0.2.10", + "getrandom", "instant", "libp2p-allow-block-list", "libp2p-connection-limits", @@ -3061,9 +3150,12 @@ dependencies = [ "libp2p-request-response", "libp2p-swarm", "libp2p-tcp", + "libp2p-upnp", "libp2p-yamux", "multiaddr 0.18.0", "pin-project", + "rw-stream-sink", + "thiserror", ] [[package]] @@ -3092,9 +3184,9 @@ dependencies = [ [[package]] name = "libp2p-core" -version = "0.40.0" +version = "0.40.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef7dd7b09e71aac9271c60031d0e558966cdb3253ba0308ab369bb2de80630d0" +checksum = "dd44289ab25e4c9230d9246c475a22241e301b23e8f4061d3bdef304a1a99713" dependencies = [ "either", "fnv", @@ -3120,10 +3212,11 @@ dependencies = [ [[package]] name = "libp2p-dns" -version = "0.40.0" +version = "0.40.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd4394c81c0c06d7b4a60f3face7e8e8a9b246840f98d2c80508d0721b032147" +checksum = "e6a18db73084b4da2871438f6239fef35190b05023de7656e877c18a00541a3b" dependencies = [ + "async-trait", "futures", "libp2p-core", "libp2p-identity", @@ -3135,9 +3228,9 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" -version = "0.45.1" +version = "0.45.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d157562dba6017193e5285acf6b1054759e83540bfd79f75b69d6ce774c88da" +checksum = "f1f9624e2a843b655f1c1b8262b8d5de6f309413fca4d66f01bb0662429f84dc" dependencies = [ "asynchronous-codec", "base64 0.21.4", @@ -3147,7 +3240,7 @@ dependencies = [ "fnv", "futures", "futures-ticker", - "getrandom 0.2.10", + "getrandom", "hex_fmt", "instant", "libp2p-core", @@ -3159,7 +3252,7 @@ dependencies = [ "quick-protobuf-codec", "rand", "regex", - "sha2 0.10.7", + "sha2 0.10.8", "smallvec", "unsigned-varint", "void", @@ -3167,13 +3260,14 @@ dependencies = [ [[package]] name = "libp2p-identify" -version = "0.43.0" +version = "0.43.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a29675a32dbcc87790db6cf599709e64308f1ae9d5ecea2d259155889982db8" +checksum = "45a96638a0a176bec0a4bcaebc1afa8cf909b114477209d7456ade52c61cd9cd" dependencies = [ "asynchronous-codec", "either", "futures", + "futures-bounded", "futures-timer", "libp2p-core", "libp2p-identity", @@ -3189,28 +3283,29 @@ dependencies = [ [[package]] name = "libp2p-identity" -version = "0.2.3" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "686e73aff5e23efbb99bc85340ea6fd8686986aa7b283a881ba182cfca535ca9" +checksum = "cdd6317441f361babc74c2989c6484eb0726045399b6648de039e1805ea96972" dependencies = [ "asn1_der", "bs58", "ed25519-dalek", + "hkdf", "libsecp256k1", "log", "multihash 0.19.1", "quick-protobuf", "rand", - "sha2 0.10.7", + "sha2 0.10.8", "thiserror", "zeroize", ] [[package]] name = "libp2p-kad" -version = "0.44.4" +version = "0.44.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc125f83d8f75322c79e4ade74677d299b34aa5c9d9b5251c03ec28c683cb765" +checksum = "16ea178dabba6dde6ffc260a8e0452ccdc8f79becf544946692fff9d412fc29d" dependencies = [ "arrayvec", "asynchronous-codec", @@ -3225,8 +3320,9 @@ dependencies = [ "libp2p-swarm", "log", "quick-protobuf", + "quick-protobuf-codec", "rand", - "sha2 0.10.7", + "sha2 0.10.8", "smallvec", "thiserror", "uint", @@ -3251,7 +3347,7 @@ dependencies = [ "smallvec", "socket2 0.5.4", "tokio", - "trust-dns-proto", + "trust-dns-proto 0.22.0", "void", ] @@ -3274,12 +3370,12 @@ dependencies = [ [[package]] name = "libp2p-noise" -version = "0.43.1" +version = "0.43.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71ce70757f2c0d82e9a3ef738fb10ea0723d16cec37f078f719e2c247704c1bb" +checksum = "d2eeec39ad3ad0677551907dd304b2f13f17208ccebe333bef194076cd2e8921" dependencies = [ "bytes", - "curve25519-dalek 4.1.0", + "curve25519-dalek", "futures", "libp2p-core", "libp2p-identity", @@ -3289,7 +3385,7 @@ dependencies = [ "once_cell", "quick-protobuf", "rand", - "sha2 0.10.7", + "sha2 0.10.8", "snow", "static_assertions", "thiserror", @@ -3299,9 +3395,9 @@ dependencies = [ [[package]] name = "libp2p-quic" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cb763e88f9a043546bfebd3575f340e7dd3d6c1b2cf2629600ec8965360c63a" +checksum = "130d451d83f21b81eb7b35b360bc7972aeafb15177784adc56528db082e6b927" dependencies = [ "bytes", "futures", @@ -3314,6 +3410,7 @@ dependencies = [ "parking_lot", "quinn", "rand", + "ring", "rustls", "socket2 0.5.4", "thiserror", @@ -3322,9 +3419,9 @@ dependencies = [ [[package]] name = "libp2p-rendezvous" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20f92973a954f8b9dfb809a41dc1a8c05bba52c067361763649f6f5ce04dc47c" +checksum = "00aec50ed436e23945cb842e0a7212b1a1504461d50310a5ba8c3bc0b4c17682" dependencies = [ "async-trait", "asynchronous-codec", @@ -3346,9 +3443,9 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.25.1" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49e2cb9befb57e55f53d9463a6ea9b1b8a09a48174ad7be149c9cbebaa5e8e9b" +checksum = "d8e3b4d67870478db72bac87bfc260ee6641d0734e0e3e275798f089c3fecfd4" dependencies = [ "async-trait", "cbor4ii 0.3.1", @@ -3366,9 +3463,9 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.43.3" +version = "0.43.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28016944851bd73526d3c146aabf0fa9bbe27c558f080f9e5447da3a1772c01a" +checksum = "580189e0074af847df90e75ef54f3f30059aedda37ea5a1659e8b9fca05c0141" dependencies = [ "either", "fnv", @@ -3402,9 +3499,9 @@ dependencies = [ [[package]] name = "libp2p-tcp" -version = "0.40.0" +version = "0.40.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09bfdfb6f945c5c014b87872a0bdb6e0aef90e92f380ef57cd9013f118f9289d" +checksum = "b558dd40d1bcd1aaaed9de898e9ec6a436019ecc2420dd0016e712fbb61c5508" dependencies = [ "futures", "futures-timer", @@ -3436,6 +3533,22 @@ dependencies = [ "yasna", ] +[[package]] +name = "libp2p-upnp" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82775a47b34f10f787ad3e2a22e2c1541e6ebef4fe9f28f3ac553921554c94c1" +dependencies = [ + "futures", + "futures-timer", + "igd-next", + "libp2p-core", + "libp2p-swarm", + "log", + "tokio", + "void", +] + [[package]] name = "libp2p-yamux" version = "0.44.1" @@ -3544,11 +3657,11 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "lru" -version = "0.10.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "718e8fae447df0c7e1ba7f5189829e63fd536945c8988d61444c19039f16b670" +checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" dependencies = [ - "hashbrown 0.13.2", + "hashbrown 0.14.1", ] [[package]] @@ -3802,10 +3915,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "windows-sys", ] +[[package]] +name = "moka" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8017ec3548ffe7d4cef7ac0e12b044c01164a74c0f3119420faeaf13490ad8b" +dependencies = [ + "async-lock", + "async-trait", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "once_cell", + "parking_lot", + "rustc_version", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multiaddr" version = "0.17.1" @@ -3893,7 +4029,7 @@ dependencies = [ "multihash-derive", "serde", "serde-big-array", - "sha2 0.10.7", + "sha2 0.10.8", "sha3", "unsigned-varint", ] @@ -3951,7 +4087,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" dependencies = [ - "getrandom 0.2.10", + "getrandom", ] [[package]] @@ -4708,7 +4844,7 @@ dependencies = [ "mach2", "once_cell", "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "web-sys", "winapi", ] @@ -4817,7 +4953,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core 0.6.4", + "rand_core", ] [[package]] @@ -4827,16 +4963,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", + "rand_core", ] [[package]] @@ -4845,7 +4972,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.10", + "getrandom", ] [[package]] @@ -4854,7 +4981,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" dependencies = [ - "rand_core 0.6.4", + "rand_core", ] [[package]] @@ -4924,7 +5051,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" dependencies = [ - "getrandom 0.2.10", + "getrandom", "redox_syscall 0.2.16", "thiserror", ] @@ -4944,14 +5071,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", - "regex-syntax 0.7.5", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", ] [[package]] @@ -4968,10 +5095,16 @@ name = "regex-automata" version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" + +[[package]] +name = "regex-automata" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] [[package]] @@ -4986,6 +5119,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "reqwest" version = "0.11.22" @@ -5277,6 +5416,9 @@ name = "semver" version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -5439,9 +5581,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", @@ -5497,6 +5639,21 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark 0.9.3", + "tempfile", + "walkdir", +] + [[package]] name = "sketches-ddsketch" version = "0.2.1" @@ -5520,9 +5677,9 @@ checksum = "826167069c09b99d56f31e9ae5c99049e932a98c9dc2dac47645b08dbbf76ba7" [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "smawk" @@ -5561,11 +5718,11 @@ dependencies = [ "aes-gcm", "blake2", "chacha20poly1305", - "curve25519-dalek 4.1.0", - "rand_core 0.6.4", + "curve25519-dalek", + "rand_core", "ring", "rustc_version", - "sha2 0.10.7", + "sha2 0.10.8", "subtle", ] @@ -5870,6 +6027,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "target-lexicon" version = "0.12.11" @@ -6315,6 +6478,12 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" + [[package]] name = "trust-dns-proto" version = "0.22.0" @@ -6341,24 +6510,50 @@ dependencies = [ "url", ] +[[package]] +name = "trust-dns-proto" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3119112651c157f4488931a01e586aa459736e9d6046d3bd9105ffb69352d374" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner 0.6.0", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.4.0", + "ipnet", + "once_cell", + "rand", + "smallvec", + "thiserror", + "tinyvec", + "tokio", + "tracing", + "url", +] + [[package]] name = "trust-dns-resolver" -version = "0.22.0" +version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aff21aa4dcefb0a1afbfac26deb0adc93888c7d295fb63ab273ef276ba2b7cfe" +checksum = "10a3e6c3aff1718b3c73e395d1f35202ba2ffa847c6a62eea0db8fb4cfe30be6" dependencies = [ "cfg-if", "futures-util", "ipconfig", - "lazy_static", "lru-cache", + "once_cell", "parking_lot", + "rand", "resolv-conf", "smallvec", "thiserror", "tokio", "tracing", - "trust-dns-proto", + "trust-dns-proto 0.23.2", ] [[package]] @@ -6433,7 +6628,7 @@ dependencies = [ "bs58", "cid 0.10.1", "futures", - "getrandom 0.2.10", + "getrandom", "instant", "libipld-core", "libipld-json", @@ -6581,7 +6776,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" dependencies = [ - "getrandom 0.2.10", + "getrandom", "rand", ] @@ -6669,12 +6864,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -7004,7 +7193,7 @@ dependencies = [ "rustix 0.38.13", "serde", "serde_derive", - "sha2 0.10.7", + "sha2 0.10.8", "toml 0.5.11", "windows-sys", "zstd", @@ -7570,22 +7759,28 @@ dependencies = [ [[package]] name = "windows" -version = "0.34.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45296b64204227616fdbf2614cefa4c236b98ee64dfaaaa435207ed99fe7829f" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" dependencies = [ - "windows_aarch64_msvc 0.34.0", - "windows_i686_gnu 0.34.0", - "windows_i686_msvc 0.34.0", - "windows_x86_64_gnu 0.34.0", - "windows_x86_64_msvc 0.34.0", + "windows-targets", ] [[package]] name = "windows" -version = "0.48.0" +version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" +dependencies = [ + "windows-core", + "windows-targets", +] + +[[package]] +name = "windows-core" +version = "0.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" dependencies = [ "windows-targets", ] @@ -7606,12 +7801,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ "windows_aarch64_gnullvm", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", "windows_x86_64_gnullvm", - "windows_x86_64_msvc 0.48.5", + "windows_x86_64_msvc", ] [[package]] @@ -7620,48 +7815,24 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" -[[package]] -name = "windows_aarch64_msvc" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" - [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" -[[package]] -name = "windows_i686_gnu" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" - [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" -[[package]] -name = "windows_i686_msvc" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" - [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" -[[package]] -name = "windows_x86_64_gnu" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" - [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -7674,12 +7845,6 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" -[[package]] -name = "windows_x86_64_msvc" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" - [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -7899,19 +8064,20 @@ dependencies = [ "libipld", "multihash 0.18.1", "once_cell", - "rand_core 0.6.4", + "rand_core", "serde", "thiserror", ] [[package]] name = "x25519-dalek" -version = "1.1.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a0c105152107e3b96f6a00a65e86ce82d9b125230e1c4302940eca58ff71f4f" +checksum = "fb66477291e7e8d2b0ff1bcb900bf29489a9692816d79874bea351e7a8b6de96" dependencies = [ - "curve25519-dalek 3.2.0", - "rand_core 0.5.1", + "curve25519-dalek", + "rand_core", + "serde", "zeroize", ] @@ -7948,6 +8114,21 @@ dependencies = [ "winreg 0.8.0", ] +[[package]] +name = "xml-rs" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fcb9cbac069e033553e8bb871be2fbdffcab578eb25bd0f7c508cedc6dcd75a" + +[[package]] +name = "xmltree" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7d8a75eaf6557bb84a65ace8609883db44a29951042ada9b393151532e41fcb" +dependencies = [ + "xml-rs", +] + [[package]] name = "yamux" version = "0.12.0" diff --git a/flake.lock b/flake.lock index edcb403f..76142117 100644 --- a/flake.lock +++ b/flake.lock @@ -36,11 +36,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1696577711, - "narHash": "sha256-94VRjvClIKDym1QRqPkX5LTQoAwZ1E6QE/3dWtOXSIQ=", + "lastModified": 1699186365, + "narHash": "sha256-Pxrw5U8mBsL3NlrJ6q1KK1crzvSUcdfwb9083sKDrcU=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "a2eb207f45e4a14a1e3019d9e3863d1e208e2295", + "rev": "a0b3b06b7a82c965ae0bb1d59f6e386fe755001d", "type": "github" }, "original": { @@ -68,11 +68,11 @@ ] }, "locked": { - "lastModified": 1696558324, - "narHash": "sha256-TnnP4LGwDB8ZGE7h2n4nA9Faee8xPkMdNcyrzJ57cbw=", + "lastModified": 1699236891, + "narHash": "sha256-J0uhoYlufJncIFbM/pAoggzHK/qERB9KfQRkmYD56yo=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "fdb37574a04df04aaa8cf7708f94a9309caebe2b", + "rev": "a7f9bf91dc5065d470cd57169a9f2ebdbdfe1f24", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index bbd189f4..f4fd8bb3 100644 --- a/flake.nix +++ b/flake.nix @@ -56,7 +56,6 @@ cargo-cross cargo-expand cargo-nextest - cargo-outdated cargo-sort cargo-spellcheck cargo-unused-features diff --git a/homestar-core/src/unit.rs b/homestar-core/src/unit.rs index 0d035c23..9bb4c0ea 100644 --- a/homestar-core/src/unit.rs +++ b/homestar-core/src/unit.rs @@ -34,7 +34,7 @@ impl From for Unit { // Default implementation. impl input::Parse for Input { fn parse(&self) -> Result, InputParseError> { - let args = match Ipld::try_from(self.to_owned())? { + let args = match Ipld::from(self.to_owned()) { Ipld::List(v) => Ipld::List(v).try_into()?, ipld => Args::new(vec![ipld.try_into()?]), }; diff --git a/homestar-core/src/workflow/instruction_result.rs b/homestar-core/src/workflow/instruction_result.rs index d9ef4cd8..6ba7e7f5 100644 --- a/homestar-core/src/workflow/instruction_result.rs +++ b/homestar-core/src/workflow/instruction_result.rs @@ -82,13 +82,13 @@ where if let Ipld::List(v) = ipld { match &v[..] { [Ipld::String(result), res] if result == OK => { - Ok(InstructionResult::Ok(res.to_owned().try_into()?)) + Ok(InstructionResult::Ok(res.to_owned().into())) } [Ipld::String(result), res] if result == ERR => { - Ok(InstructionResult::Error(res.to_owned().try_into()?)) + Ok(InstructionResult::Error(res.to_owned().into())) } [Ipld::String(result), res] if result == JUST => { - Ok(InstructionResult::Just(res.to_owned().try_into()?)) + Ok(InstructionResult::Just(res.to_owned().into())) } other_ipld => Err(workflow::Error::unexpected_ipld( other_ipld.to_owned().into(), diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index b73dbcab..6b5d9348 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -111,6 +111,10 @@ metrics-exporter-prometheus = { version = "0.12.1", default-features = false, fe "http-listener", ], optional = true } miette = { version = "5.10", default-features = false, features = ["fancy"] } +moka = { version = "0.12.1", default-features = false, features = [ + "future", + "sync", +] } names = { version = "0.14", default-features = false, optional = true } proptest = { version = "1.2", optional = true } puffin = { version = "0.17", default-features = false, optional = true } diff --git a/homestar-runtime/fixtures/__testkey_ed25519_2.pem b/homestar-runtime/fixtures/__testkey_ed25519_2.pem new file mode 100644 index 00000000..cef18c35 --- /dev/null +++ b/homestar-runtime/fixtures/__testkey_ed25519_2.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE= +-----END PRIVATE KEY----- diff --git a/homestar-runtime/fixtures/__testkey_ed25519_3.pem b/homestar-runtime/fixtures/__testkey_ed25519_3.pem new file mode 100644 index 00000000..d7925361 --- /dev/null +++ b/homestar-runtime/fixtures/__testkey_ed25519_3.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +AgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgI= +-----END PRIVATE KEY----- diff --git a/homestar-runtime/fixtures/__testkey_ed25519_4.pem b/homestar-runtime/fixtures/__testkey_ed25519_4.pem new file mode 100644 index 00000000..1bae1c12 --- /dev/null +++ b/homestar-runtime/fixtures/__testkey_ed25519_4.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +AwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwM= +-----END PRIVATE KEY----- diff --git a/homestar-runtime/fixtures/__testkey_ed25519_5.pem b/homestar-runtime/fixtures/__testkey_ed25519_5.pem new file mode 100644 index 00000000..f8c191e3 --- /dev/null +++ b/homestar-runtime/fixtures/__testkey_ed25519_5.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +BAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQ= +-----END PRIVATE KEY----- diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 542c7b4f..5af1044c 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -6,7 +6,7 @@ use crate::network::ws; use crate::network::IpfsCli; use crate::{ db::Database, - network::swarm::{ComposedBehaviour, RequestResponseKey}, + network::swarm::{ComposedBehaviour, PeerDiscoveryInfo, RequestResponseKey}, settings, }; use anyhow::Result; @@ -16,14 +16,17 @@ use libp2p::{ core::ConnectedPoint, futures::StreamExt, kad::QueryId, rendezvous::Cookie, request_response::RequestId, swarm::Swarm, PeerId, }; +use moka::future::Cache; use std::{sync::Arc, time::Duration}; use swarm_event::ResponseEvent; -use tokio::select; +use tokio::{runtime::Handle, select}; +pub(crate) mod cache; pub mod channel; pub(crate) mod error; pub(crate) mod event; pub(crate) mod swarm_event; +pub(crate) use cache::{setup_cache, CacheValue}; pub(crate) use error::RequestResponseError; pub(crate) use event::Event; @@ -54,17 +57,19 @@ pub(crate) struct EventHandler { p2p_provider_timeout: Duration, db: DB, swarm: Swarm, + cache: Arc>, sender: Arc>, receiver: channel::AsyncBoundedChannelReceiver, query_senders: FnvHashMap)>, - connected_peers: FnvHashMap, + connections: Connections, request_response_senders: FnvHashMap, - rendezvous_cookies: FnvHashMap, + rendezvous: Rendezvous, pubsub_enabled: bool, ws_msg_sender: ws::Notifier, node_addresses: Vec, announce_addresses: Vec, external_address_limit: u32, + poll_cache_interval: Duration, } /// Event loop handler for [libp2p] network events and commands. @@ -76,16 +81,32 @@ pub(crate) struct EventHandler { p2p_provider_timeout: Duration, db: DB, swarm: Swarm, + cache: Cache, sender: Arc>, receiver: channel::AsyncBoundedChannelReceiver, query_senders: FnvHashMap)>, - connected_peers: FnvHashMap, + connections: Connections, request_response_senders: FnvHashMap, - rendezvous_cookies: FnvHashMap, + rendezvous: Rendezvous, pubsub_enabled: bool, node_addresses: Vec, announce_addresses: Vec, external_address_limit: u32, + poll_cache_interval: Duration, +} + +/// Rendezvous protocol configurations and state +struct Rendezvous { + registration_ttl: Duration, + discovery_interval: Duration, + discovered_peers: FnvHashMap, + cookies: FnvHashMap, +} + +// Connected peers configuration and state +struct Connections { + peers: FnvHashMap, + max_peers: u32, } impl EventHandler @@ -110,23 +131,34 @@ where ws_msg_sender: ws::Notifier, ) -> Self { let (sender, receiver) = Self::setup_channel(settings); + let sender = Arc::new(sender); Self { receipt_quorum: settings.network.receipt_quorum, workflow_quorum: settings.network.workflow_quorum, p2p_provider_timeout: settings.network.p2p_provider_timeout, db, swarm, - sender: Arc::new(sender), + cache: Arc::new(setup_cache(sender.clone())), + sender, receiver, query_senders: FnvHashMap::default(), request_response_senders: FnvHashMap::default(), - connected_peers: FnvHashMap::default(), - rendezvous_cookies: FnvHashMap::default(), + connections: Connections { + peers: FnvHashMap::default(), + max_peers: settings.network.max_connected_peers, + }, + rendezvous: Rendezvous { + registration_ttl: settings.network.rendezvous_registration_ttl, + discovery_interval: settings.network.rendezvous_discovery_interval, + discovered_peers: FnvHashMap::default(), + cookies: FnvHashMap::default(), + }, pubsub_enabled: settings.network.enable_pubsub, ws_msg_sender, node_addresses: settings.network.node_addresses.clone(), announce_addresses: settings.network.announce_addresses.clone(), external_address_limit: settings.network.max_announce_addresses, + poll_cache_interval: settings.network.poll_cache_interval, } } @@ -134,22 +166,33 @@ where #[cfg(not(feature = "websocket-server"))] pub(crate) fn new(swarm: Swarm, db: DB, settings: &settings::Node) -> Self { let (sender, receiver) = Self::setup_channel(settings); + let sender = Arc::new(sender); Self { receipt_quorum: settings.network.receipt_quorum, workflow_quorum: settings.network.workflow_quorum, p2p_provider_timeout: settings.network.p2p_provider_timeout, db, swarm, - sender: Arc::new(sender), + cache: Arc::new(setup_cache(sender.clone())), + sender, receiver, query_senders: FnvHashMap::default(), - connected_peers: FnvHashMap::default(), request_response_senders: FnvHashMap::default(), - rendezvous_cookies: FnvHashMap::default(), + connections: Connections { + peers: FnvHashMap::default(), + max_peers: settings.network.max_connected_peers, + }, + rendezvous: Rendezvous { + registration_ttl: settings.network.rendezvous_registration_ttl, + discovery_interval: settings.network.rendezvous_discovery_interval, + discovered_peers: FnvHashMap::default(), + cookies: FnvHashMap::default(), + }, pubsub_enabled: settings.network.enable_pubsub, node_addresses: settings.network.node_addresses.clone(), announce_addresses: settings.network.announce_addresses.clone(), external_address_limit: settings.network.max_announce_addresses, + poll_cache_interval: settings.network.poll_cache_interval, } } @@ -178,6 +221,9 @@ where /// [events]: libp2p::swarm::SwarmEvent #[cfg(not(feature = "ipfs"))] pub(crate) async fn start(mut self) -> Result<()> { + let handle = Handle::current(); + handle.spawn(poll_cache(self.cache.clone())); + loop { select! { runtime_event = self.receiver.recv_async() => { @@ -189,7 +235,6 @@ where swarm_event.handle_event(&mut self).await; } - } } } @@ -198,6 +243,9 @@ where /// [events]: libp2p::swarm::SwarmEvent #[cfg(feature = "ipfs")] pub(crate) async fn start(mut self, ipfs: IpfsCli) -> Result<()> { + let handle = Handle::current(); + handle.spawn(poll_cache(self.cache.clone(), self.poll_cache_interval)); + loop { select! { runtime_event = self.receiver.recv_async() => { @@ -213,3 +261,13 @@ where } } } + +/// Poll cache for expired entries +async fn poll_cache(cache: Arc>, poll_interval: Duration) { + let mut interval = tokio::time::interval(poll_interval); + + loop { + interval.tick().await; + cache.run_pending_tasks().await; + } +} diff --git a/homestar-runtime/src/event_handler/cache.rs b/homestar-runtime/src/event_handler/cache.rs new file mode 100644 index 00000000..c05386c0 --- /dev/null +++ b/homestar-runtime/src/event_handler/cache.rs @@ -0,0 +1,83 @@ +use crate::{channel, event_handler::Event}; +use libp2p::PeerId; +use moka::{ + future::Cache, + notification::RemovalCause::{self, Expired}, + Expiry as ExpiryBase, +}; +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; + +struct Expiry; + +impl ExpiryBase for Expiry { + fn expire_after_create( + &self, + _key: &String, + value: &CacheValue, + _current_time: Instant, + ) -> Option { + Some(value.expiration) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct CacheValue { + expiration: Duration, + data: HashMap, +} + +impl CacheValue { + pub(crate) fn new(expiration: Duration, data: HashMap) -> Self { + Self { expiration, data } + } +} + +#[derive(Clone, Debug)] +pub(crate) enum CacheData { + Peer(PeerId), + OnExpiration(DispatchEvent), +} + +#[derive(Clone, Debug)] +pub(crate) enum DispatchEvent { + RegisterPeer, + DiscoverPeers, +} + +pub(crate) fn setup_cache( + sender: Arc>, +) -> Cache { + let eviction_listener = move |_key: Arc, val: CacheValue, cause: RemovalCause| { + let tx = Arc::clone(&sender); + + if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") { + if cause != Expired { + return; + } + + match event { + DispatchEvent::RegisterPeer => { + if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node") + { + let _ = tx.send(Event::RegisterPeer(rendezvous_node.to_owned())); + }; + } + DispatchEvent::DiscoverPeers => { + if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node") + { + let _ = tx.send(Event::DiscoverPeers(rendezvous_node.to_owned())); + }; + } + } + } + }; + + Cache::builder() + .expire_after(Expiry) + .eviction_listener(eviction_listener) + .build() +} diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index e2044480..9a770efe 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -22,6 +22,7 @@ use homestar_core::{ipld::DagJson, workflow::Receipt as InvocationReceipt}; use libipld::{Cid, Ipld}; use libp2p::{ kad::{record::Key, Quorum, Record}, + rendezvous::Namespace, PeerId, }; use std::{collections::HashSet, num::NonZeroUsize, sync::Arc}; @@ -106,8 +107,14 @@ pub(crate) enum Event { ProvideRecord(Cid, Option, CapsuleTag), /// Found Providers/[PeerId]s on the DHT. Providers(Result<(HashSet, RequestResponseKey, P2PSender)>), + /// Register with a rendezvous node. + RegisterPeer(PeerId), + /// Discover peers from a rendezvous node. + DiscoverPeers(PeerId), } +const RENDEZVOUS_NAMESPACE: &str = "homestar"; + impl Event { async fn handle_info(self, event_handler: &mut EventHandler) -> Result<()> where @@ -164,6 +171,44 @@ impl Event { Event::Providers(Err(err)) => { error!("failed to find providers: {}", err); } + Event::RegisterPeer(peer_id) => { + if let Some(rendezvous_client) = event_handler + .swarm + .behaviour_mut() + .rendezvous_client + .as_mut() + { + // register self with remote + if let Err(err) = rendezvous_client.register( + Namespace::from_static(RENDEZVOUS_NAMESPACE), + peer_id, + Some(event_handler.rendezvous.registration_ttl.as_secs()), + ) { + warn!( + peer_id = peer_id.to_string(), + err = format!("{err}"), + "failed to register with rendezvous peer" + ) + } + } + } + Event::DiscoverPeers(peer_id) => { + if let Some(rendezvous_client) = event_handler + .swarm + .behaviour_mut() + .rendezvous_client + .as_mut() + { + let cookie = event_handler.rendezvous.cookies.get(&peer_id).cloned(); + + rendezvous_client.discover( + Some(Namespace::from_static(RENDEZVOUS_NAMESPACE)), + cookie, + None, + peer_id, + ); + } + } _ => {} } Ok(()) diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index c64cf194..3bed8c4a 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -5,9 +5,15 @@ use super::EventHandler; use crate::network::IpfsCli; use crate::{ db::{Connection, Database}, - event_handler::{event::QueryRecord, Event, Handler, RequestResponseError}, + event_handler::{ + cache::{self, CacheData, CacheValue}, + event::QueryRecord, + Event, Handler, RequestResponseError, + }, libp2p::multiaddr::MultiaddrExt, - network::swarm::{CapsuleTag, ComposedEvent, RequestResponseKey, HOMESTAR_PROTOCOL_VER}, + network::swarm::{ + CapsuleTag, ComposedEvent, PeerDiscoveryInfo, RequestResponseKey, HOMESTAR_PROTOCOL_VER, + }, receipt::{RECEIPT_TAG, VERSION_KEY}, workflow, workflow::WORKFLOW_TAG, @@ -23,17 +29,20 @@ use libipld::{Cid, Ipld}; use libp2p::{ gossipsub, identify, kad, kad::{ - AddProviderOk, BootstrapOk, GetProvidersOk, GetRecordOk, KademliaEvent, PeerRecord, - PutRecordOk, QueryResult, + AddProviderOk, BootstrapOk, GetProvidersOk, GetRecordOk, PeerRecord, PutRecordOk, + QueryResult, }, mdns, multiaddr::Protocol, - rendezvous::{self, Namespace}, + rendezvous::{self, Namespace, Registration}, request_response, swarm::{dial_opts::DialOpts, SwarmEvent}, PeerId, StreamProtocol, }; -use std::{collections::HashSet, fmt}; +use std::{ + collections::{HashMap, HashSet}, + fmt, +}; use tracing::{debug, error, info, warn}; const RENDEZVOUS_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/rendezvous/1.0.0"); @@ -113,27 +122,28 @@ async fn handle_swarm_event( let num_addresses = event_handler.swarm.external_addresses().count(); - // underlying structure is a hashset, so no worry on dupes - if num_addresses < event_handler.external_address_limit as usize { + // Add observed address as an external address if we are identifying ourselves + if &peer_id == event_handler.swarm.local_peer_id() + && num_addresses < event_handler.external_address_limit as usize + { info.observed_addr .iter() - // if _any_ part of the multiaddr includes a private IP, dont add it to our external address list + // If any part of the multiaddr includes a private IP, don't add it to our external address list .filter_map(|proto| match proto { Protocol::Ip4(ip) => Some(ip), _ => None, }) .all(|proto| !proto.is_private()) - // identify observed a potentially valid external address that we weren't aware of. - // add it to the addresses we announce to other peers + // Identify observed a potentially valid external address that we weren't aware of. + // Add it to the addresses we announce to other peers. // TODO: have a set of _maybe_ external addresses that we validate with other peers first before adding it .then(|| event_handler.swarm.add_external_address(info.observed_addr)); } let behavior = event_handler.swarm.behaviour_mut(); - // kademlia + // Add listen addresses to kademlia routing table if info.protocols.contains(&kad::PROTOCOL_NAME) { - // add listen addresses to kademlia routing table for addr in info.listen_addrs { behavior.kademlia.add_address(&peer_id, addr); debug!( @@ -143,8 +153,7 @@ async fn handle_swarm_event( } } - // rendezvous - // we are good to register self & discover with any node we contact. more peers = more better! + // Register and discover with nodes running the rendezvous protocol if info.protocols.contains(&RENDEZVOUS_PROTOCOL_NAME) { if let Some(rendezvous_client) = event_handler .swarm @@ -156,7 +165,7 @@ async fn handle_swarm_event( if let Err(err) = rendezvous_client.register( Namespace::from_static(RENDEZVOUS_NAMESPACE), peer_id, - None, + Some(event_handler.rendezvous.registration_ttl.as_secs()), ) { warn!( peer_id = peer_id.to_string(), @@ -164,7 +173,8 @@ async fn handle_swarm_event( "failed to register with rendezvous peer" ) } - // discover other nodes + + // Discover other nodes rendezvous_client.discover( Some(Namespace::from_static(RENDEZVOUS_NAMESPACE)), None, @@ -187,26 +197,98 @@ async fn handle_swarm_event( registrations, cookie, } => { - // save cookie for later (when we are hungry for snacks again. yummy.) if cookie.namespace() == Some(&Namespace::from_static(RENDEZVOUS_NAMESPACE)) { + debug!( + peer_id = rendezvous_node.to_string(), + "received discovery from rendezvous server" + ); + + // Store cookie event_handler - .rendezvous_cookies + .rendezvous + .cookies .insert(rendezvous_node, cookie); - // dial discovered peers - for registration in registrations { - // TODO: do anything with ttl here? - let opts = DialOpts::peer_id(registration.record.peer_id()) - .addresses(registration.record.addresses().to_vec()) - .condition(libp2p::swarm::dial_opts::PeerCondition::Disconnected) - .build(); - // TODO: we might be dialing too many peers here. Add settings to configure when we stop dialing new peers - if let Err(err) = event_handler.swarm.dial(opts) { - warn!(peer_id=registration.record.peer_id().to_string(), err=?err, "failed to dial peer discovered through rendezvous") + let connected_peers_count = event_handler.connections.peers.len(); + + // Skip dialing peers if at connected peers limit + if connected_peers_count >= event_handler.connections.max_peers as usize { + warn!("peers discovered through rendezvous not dialed because max connected peers limit reached"); + return; + } + + // Filter out already connected peers + let new_registrations: Vec<&Registration> = registrations + .iter() + .filter(|registration| { + !event_handler + .connections + .peers + .contains_key(®istration.record.peer_id()) + }) + .collect(); + + // Dial newly discovered peers + for (index, registration) in new_registrations.iter().enumerate() { + let self_registration = ®istration.record.peer_id() + == event_handler.swarm.local_peer_id(); + + // Dial discovered peer if not us and not at connected peers limit + if !self_registration + && connected_peers_count + index + < event_handler.connections.max_peers as usize + { + let peer_id = registration.record.peer_id(); + let opts = DialOpts::peer_id(peer_id) + .addresses(registration.record.addresses().to_vec()) + .condition( + libp2p::swarm::dial_opts::PeerCondition::Disconnected, + ) + .build(); + + match event_handler.swarm.dial(opts) { + Ok(_) => { + event_handler.rendezvous.discovered_peers.insert( + peer_id, + PeerDiscoveryInfo::new(rendezvous_node), + ); + } + Err(err) => { + warn!(peer_id=peer_id.to_string(), err=?err, "failed to dial peer discovered through rendezvous"); + } + }; + } else if !self_registration { + warn!( + peer_id=registration.record.peer_id().to_string(), + "peer discovered through rendezvous not dialed because the max connected peers limit was reached" + ) } } + + // Discover peers again at discovery interval + event_handler + .cache + .insert( + format!("{}-discover", rendezvous_node), + CacheValue::new( + event_handler.rendezvous.discovery_interval, + HashMap::from([ + ( + "on_expiration".to_string(), + CacheData::OnExpiration( + cache::DispatchEvent::DiscoverPeers, + ), + ), + ( + "rendezvous_node".to_string(), + CacheData::Peer(rendezvous_node), + ), + ]), + ), + ) + .await; } else { - // don't add peers that aren't from our namespace + // Do not dial peers that are not using our namespace warn!(peer_id=rendezvous_node.to_string(), namespace=?cookie.namespace(), "rendezvous peer gave records from an unexpected namespace"); } } @@ -221,11 +303,33 @@ async fn handle_swarm_event( rendezvous_node, ttl, .. - } => debug!( - peer_id = rendezvous_node.to_string(), - ttl = ttl, - "registered self with rendezvous peer" - ), + } => { + debug!( + peer_id = rendezvous_node.to_string(), + ttl = ttl, + "registered self with rendezvous node" + ); + + event_handler + .cache + .insert( + format!("{}-register", rendezvous_node), + CacheValue::new( + event_handler.rendezvous.registration_ttl, + HashMap::from([ + ( + "on_expiration".to_string(), + CacheData::OnExpiration(cache::DispatchEvent::RegisterPeer), + ), + ( + "rendezvous_node".to_string(), + CacheData::Peer(rendezvous_node), + ), + ]), + ), + ) + .await; + } rendezvous::client::Event::RegisterFailed { rendezvous_node, error, @@ -241,13 +345,18 @@ async fn handle_swarm_event( .rendezvous_client .as_mut() { - let cookie = event_handler.rendezvous_cookies.get(&peer).cloned(); - rendezvous_client.discover( - Some(Namespace::from_static(RENDEZVOUS_NAMESPACE)), - cookie, - None, - peer, - ); + let cookie = event_handler.rendezvous.cookies.get(&peer).cloned(); + + if let Some(discovery_info) = + event_handler.rendezvous.discovered_peers.remove(&peer) + { + rendezvous_client.discover( + Some(Namespace::from_static(RENDEZVOUS_NAMESPACE)), + cookie, + None, + discovery_info.rendezvous_point, + ); + } } } } @@ -261,6 +370,12 @@ async fn handle_swarm_event( rendezvous::server::Event::DiscoverNotServed { enquirer, error } => { warn!(peer_id=enquirer.to_string(), err=?error, "did not serve rendezvous discover request") } + rendezvous::server::Event::PeerRegistered { peer, .. } => { + debug!( + peer_id = peer.to_string(), + "registered peer through rendezvous" + ) + } rendezvous::server::Event::PeerNotRegistered { peer, namespace, @@ -268,6 +383,12 @@ async fn handle_swarm_event( } => { warn!(peer_id=peer.to_string(), err=?error, namespace=?namespace, "did not register peer with rendezvous") } + rendezvous::server::Event::RegistrationExpired(registration) => { + debug!( + peer_id = registration.record.peer_id().to_string(), + "rendezvous peer registration expired on server" + ) + } _ => (), } } @@ -300,9 +421,11 @@ async fn handle_swarm_event( _ => {} }, - SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryProgressed { id, result, .. }, - )) => { + SwarmEvent::Behaviour(ComposedEvent::Kademlia(kad::Event::OutboundQueryProgressed { + id, + result, + .. + })) => { match result { QueryResult::Bootstrap(Ok(BootstrapOk { peer, .. })) => { debug!("successfully bootstrapped peer: {peer}") @@ -437,9 +560,8 @@ async fn handle_swarm_event( _ => {} } } - SwarmEvent::Behaviour(ComposedEvent::Kademlia(KademliaEvent::RoutingUpdated { - peer, - .. + SwarmEvent::Behaviour(ComposedEvent::Kademlia(kad::Event::RoutingUpdated { + peer, .. })) => { debug!( peer = peer.to_string(), @@ -551,11 +673,21 @@ async fn handle_swarm_event( addr = multiaddr.to_string(), "mDNS discovered a new peer" ); - let _ = event_handler.swarm.dial( - DialOpts::peer_id(peer_id) - .addresses(vec![multiaddr]) - .build(), - ); + + if event_handler.connections.peers.len() + < event_handler.connections.max_peers as usize + { + let _ = event_handler.swarm.dial( + DialOpts::peer_id(peer_id) + .addresses(vec![multiaddr]) + .build(), + ); + } else { + warn!( + peer_id = peer_id.to_string(), + "peer discovered by mDNS not dialed because max connected peers limit reached" + ) + } } } SwarmEvent::Behaviour(ComposedEvent::Mdns(mdns::Event::Expired(list))) => { @@ -590,14 +722,14 @@ async fn handle_swarm_event( } => { debug!(peer_id=peer_id.to_string(), endpoint=?endpoint, "peer connection established"); // add peer to connected peers list - event_handler.connected_peers.insert(peer_id, endpoint); + event_handler.connections.peers.insert(peer_id, endpoint); } SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { debug!( peer_id = peer_id.to_string(), "peer connection closed, cause: {cause:?}" ); - event_handler.connected_peers.remove_entry(&peer_id); + event_handler.connections.peers.remove_entry(&peer_id); // Remove peer from DHT if not in configured peers if event_handler.node_addresses.iter().all(|multiaddr| { @@ -651,7 +783,10 @@ async fn handle_swarm_event( SwarmEvent::ListenerError { listener_id, error } => { error!(err=?error, listener_id=?listener_id, "listener error") } - SwarmEvent::Dialing { .. } => todo!(), + SwarmEvent::Dialing { peer_id, .. } => match peer_id { + Some(id) => debug!(peer_id = id.to_string(), "dialing peer"), + None => debug!("dialing an unknown peer"), + }, e => debug!(e=?e, "uncaught event"), } } diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 4e5dd8f3..4deb4d87 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -58,6 +58,7 @@ fn init( ) .add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT)) .add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT)) }); #[cfg(all( diff --git a/homestar-runtime/src/network/swarm.rs b/homestar-runtime/src/network/swarm.rs index 145e44e6..825b221d 100644 --- a/homestar-runtime/src/network/swarm.rs +++ b/homestar-runtime/src/network/swarm.rs @@ -20,14 +20,13 @@ use libp2p::{ kad::{ self, record::store::{MemoryStore, MemoryStoreConfig}, - Kademlia, KademliaConfig, KademliaEvent, }, mdns, multiaddr::Protocol, noise, rendezvous, request_response::{self, ProtocolSupport}, - swarm::{behaviour::toggle::Toggle, NetworkBehaviour, Swarm, SwarmBuilder}, - tcp, yamux, StreamProtocol, Transport, + swarm::{self, behaviour::toggle::Toggle, NetworkBehaviour, Swarm}, + tcp, yamux, PeerId, StreamProtocol, Transport, }; use serde::{Deserialize, Serialize}; use std::fmt; @@ -53,7 +52,7 @@ pub(crate) async fn new(settings: &settings::Node) -> Result Result Result Result Result Self { + Self { rendezvous_point } + } +} + /// Key data structure for [request_response::Event] messages. #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct RequestResponseKey { @@ -230,8 +252,8 @@ impl fmt::Display for CapsuleTag { pub(crate) enum ComposedEvent { /// [gossipsub::Event] event. Gossipsub(Box), - /// [KademliaEvent] event. - Kademlia(KademliaEvent), + /// [kad::Event] event. + Kademlia(kad::Event), /// [request_response::Event] event. RequestResponse(request_response::Event>), /// [mdns::Event] event. @@ -258,8 +280,8 @@ pub(crate) enum TopicMessage { pub(crate) struct ComposedBehaviour { /// [gossipsub::Behaviour] behaviour. pub(crate) gossipsub: Toggle, - /// In-memory [kademlia: Kademlia] behaviour. - pub(crate) kademlia: Kademlia, + /// In-memory [kademlia: kad::Behaviour] behaviour. + pub(crate) kademlia: kad::Behaviour, /// [request_response::Behaviour] CBOR-flavored behaviour. pub(crate) request_response: request_response::cbor::Behaviour>, /// [mdns::tokio::Behaviour] behaviour. @@ -319,8 +341,8 @@ impl From for ComposedEvent { } } -impl From for ComposedEvent { - fn from(event: KademliaEvent) -> Self { +impl From for ComposedEvent { + fn from(event: kad::Event) -> Self { ComposedEvent::Kademlia(event) } } diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index 27c9bf0e..2b286276 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -36,10 +36,10 @@ impl Settings { /// Process monitoring settings. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct Monitoring { - /// Monitoring collection interval in milliseconds. - pub process_collector_interval: u64, /// Metrics port for prometheus scraping. pub metrics_port: u16, + /// Monitoring collection interval in milliseconds. + pub process_collector_interval: u64, /// Tokio console port. pub console_subscriber_port: u16, } @@ -76,8 +76,16 @@ pub struct Network { /// [Swarm]: libp2p::swarm::Swarm #[serde(with = "http_serde::uri")] pub(crate) listen_address: Uri, - /// Enable Rendezvous protocol. - pub(crate) enable_rendezvous: bool, + /// Enable Rendezvous protocol client. + pub(crate) enable_rendezvous_client: bool, + /// Enable Rendezvous protocol server. + pub(crate) enable_rendezvous_server: bool, + /// Rendezvous registration TTL. + #[serde_as(as = "DurationSeconds")] + pub(crate) rendezvous_registration_ttl: Duration, + /// Rendezvous discovery interval. + #[serde_as(as = "DurationSeconds")] + pub(crate) rendezvous_discovery_interval: Duration, /// Enable mDNS. pub(crate) enable_mdns: bool, /// mDNS IPv6 enable flag @@ -141,8 +149,13 @@ pub struct Network { /// network. #[serde_as(as = "Vec")] pub(crate) announce_addresses: Vec, - /// Limit on the number of external addresses we annoucne to other peers. + /// Maximum number of peers we will dial. + pub(crate) max_connected_peers: u32, + /// Limit on the number of external addresses we announce to other peers. pub(crate) max_announce_addresses: u32, + /// Event handler poll cache interval in milliseconds. + #[serde_as(as = "DurationMilliSeconds")] + pub(crate) poll_cache_interval: Duration, } /// Database-related settings for a homestar node. @@ -186,8 +199,11 @@ impl Default for Network { Self { events_buffer_len: 1024, listen_address: Uri::from_static("/ip4/0.0.0.0/tcp/0"), + enable_rendezvous_client: true, + enable_rendezvous_server: false, + rendezvous_registration_ttl: Duration::from_secs(2 * 60 * 60), + rendezvous_discovery_interval: Duration::from_secs(10 * 60), // TODO: we would like to enable this by default, however this breaks mdns on at least some linux distros. Requires further investigation. - enable_rendezvous: true, enable_mdns: true, mdns_enable_ipv6: false, mdns_query_interval: Duration::from_secs(5 * 60), @@ -211,7 +227,9 @@ impl Default for Network { keypair_config: PubkeyConfig::Random, node_addresses: Vec::new(), announce_addresses: Vec::new(), + max_connected_peers: 32, max_announce_addresses: 10, + poll_cache_interval: Duration::from_millis(1000), } } } diff --git a/homestar-runtime/tests/cli.rs b/homestar-runtime/tests/cli.rs index 204b1feb..0687aac5 100644 --- a/homestar-runtime/tests/cli.rs +++ b/homestar-runtime/tests/cli.rs @@ -141,7 +141,7 @@ fn test_server_serial() -> Result<()> { .spawn() .unwrap(); - let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 9832); + let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 9837); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -156,7 +156,7 @@ fn test_server_serial() -> Result<()> { .arg("--host") .arg("::1") .arg("-p") - .arg("9832") + .arg("9837") .assert() .success() .stdout(predicate::str::contains("::1")) @@ -167,7 +167,7 @@ fn test_server_serial() -> Result<()> { .arg("--host") .arg("::1") .arg("-p") - .arg("9830") + .arg("9835") .assert() .failure() .stderr( @@ -274,7 +274,7 @@ fn test_daemon_serial() -> Result<()> { .assert() .success(); - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9831); + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9836); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -288,7 +288,7 @@ fn test_daemon_serial() -> Result<()> { .arg("--host") .arg("127.0.0.1") .arg("-p") - .arg("9831") + .arg("9836") .assert() .success() .stdout(predicate::str::contains("127.0.0.1")) @@ -362,7 +362,7 @@ fn test_server_v4_serial() -> Result<()> { .spawn() .unwrap(); - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9830); + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9835); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -377,7 +377,7 @@ fn test_server_v4_serial() -> Result<()> { .arg("--host") .arg("127.0.0.1") .arg("-p") - .arg("9830") + .arg("9835") .assert() .success() .stdout(predicate::str::contains("127.0.0.1")) @@ -410,7 +410,7 @@ fn test_daemon_v4_serial() -> Result<()> { .assert() .success(); - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9830); + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9835); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -424,7 +424,7 @@ fn test_daemon_v4_serial() -> Result<()> { .arg("--host") .arg("127.0.0.1") .arg("-p") - .arg("9830") + .arg("9835") .assert() .success() .stdout(predicate::str::contains("127.0.0.1")) diff --git a/homestar-runtime/tests/fixtures/test_mdns1.toml b/homestar-runtime/tests/fixtures/test_mdns1.toml index d870de70..30e9a742 100644 --- a/homestar-runtime/tests/fixtures/test_mdns1.toml +++ b/homestar-runtime/tests/fixtures/test_mdns1.toml @@ -9,7 +9,8 @@ console_subscriber_port = 5560 rpc_port = 9800 websocket_port = 8000 listen_address = "/ip4/0.0.0.0/tcp/0" -enable_rendezvous = false +enable_rendezvous_client = false +# Peer ID 12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN [node.network.keypair_config] existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" } diff --git a/homestar-runtime/tests/fixtures/test_mdns2.toml b/homestar-runtime/tests/fixtures/test_mdns2.toml index fe0fe2b3..1eb3a72b 100644 --- a/homestar-runtime/tests/fixtures/test_mdns2.toml +++ b/homestar-runtime/tests/fixtures/test_mdns2.toml @@ -9,7 +9,8 @@ console_subscriber_port = 5561 rpc_port = 9801 websocket_port = 8001 listen_address = "/ip4/0.0.0.0/tcp/0" -enable_rendezvous = false +enable_rendezvous_client = false +# Peer ID 16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc [node.network.keypair_config] existing = { key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" } diff --git a/homestar-runtime/tests/fixtures/test_metrics.toml b/homestar-runtime/tests/fixtures/test_metrics.toml index 726d6c60..9c066443 100644 --- a/homestar-runtime/tests/fixtures/test_metrics.toml +++ b/homestar-runtime/tests/fixtures/test_metrics.toml @@ -9,5 +9,6 @@ console_subscriber_port = 5570 rpc_port = 9810 websocket_port = 8010 +# Peer ID 12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN [node.network.keypair_config] existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" } diff --git a/homestar-runtime/tests/fixtures/test_network1.toml b/homestar-runtime/tests/fixtures/test_network1.toml index 1477ae03..f3db1601 100644 --- a/homestar-runtime/tests/fixtures/test_network1.toml +++ b/homestar-runtime/tests/fixtures/test_network1.toml @@ -13,7 +13,8 @@ node_addresses = [ "/ip4/127.0.0.1/tcp/7001/p2p/16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", ] enable_mdns = false -enable_rendezvous = false +enable_rendezvous_client = false +# Peer ID 12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN [node.network.keypair_config] existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" } diff --git a/homestar-runtime/tests/fixtures/test_network2.toml b/homestar-runtime/tests/fixtures/test_network2.toml index d2aa2675..f2081fbf 100644 --- a/homestar-runtime/tests/fixtures/test_network2.toml +++ b/homestar-runtime/tests/fixtures/test_network2.toml @@ -13,7 +13,8 @@ node_addresses = [ "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", ] enable_mdns = false -enable_rendezvous = false +enable_rendezvous_client = false +# Peer ID 16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc [node.network.keypair_config] existing = { key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" } diff --git a/homestar-runtime/tests/fixtures/test_rendezvous1.toml b/homestar-runtime/tests/fixtures/test_rendezvous1.toml new file mode 100644 index 00000000..6070f8dd --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_rendezvous1.toml @@ -0,0 +1,17 @@ +[monitoring] +process_collector_interval = 500 +metrics_port = 4035 +console_subscriber_port = 5585 + +[node] + +[node.network] +rpc_port = 9825 +websocket_port = 8025 +listen_address = "/ip4/127.0.0.1/tcp/7000" +enable_rendezvous_server = true +enable_mdns = false + +# Peer ID 12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN +[node.network.keypair_config] +existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" } diff --git a/homestar-runtime/tests/fixtures/test_rendezvous2.toml b/homestar-runtime/tests/fixtures/test_rendezvous2.toml new file mode 100644 index 00000000..94fa3109 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_rendezvous2.toml @@ -0,0 +1,22 @@ +[monitoring] +process_collector_interval = 500 +metrics_port = 4036 +console_subscriber_port = 5586 + +[node] + +[node.network] +rpc_port = 9826 +websocket_port = 8026 +listen_address = "/ip4/127.0.0.1/tcp/7001" +announce_addresses = [ + "/ip4/127.0.0.1/tcp/7001/p2p/16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", +] +node_addresses = [ + "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", +] +enable_mdns = false + +# Peer ID 16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc +[node.network.keypair_config] +existing = { key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" } diff --git a/homestar-runtime/tests/fixtures/test_rendezvous3.toml b/homestar-runtime/tests/fixtures/test_rendezvous3.toml new file mode 100644 index 00000000..31553836 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_rendezvous3.toml @@ -0,0 +1,19 @@ +[monitoring] +process_collector_interval = 500 +metrics_port = 4037 +console_subscriber_port = 5587 + +[node] + +[node.network] +rpc_port = 9827 +websocket_port = 8027 +listen_address = "/ip4/127.0.0.1/tcp/7002" +node_addresses = [ + "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", +] +enable_mdns = false + +# Peer ID 12D3KooWK99VoVxNE7XzyBwXEzW7xhK7Gpv85r9F3V3fyKSUKPH5 +[node.network.keypair_config] +existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519_2.pem" } diff --git a/homestar-runtime/tests/fixtures/test_rendezvous4.toml b/homestar-runtime/tests/fixtures/test_rendezvous4.toml new file mode 100644 index 00000000..c09f5741 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_rendezvous4.toml @@ -0,0 +1,23 @@ +[monitoring] +process_collector_interval = 500 +metrics_port = 4038 +console_subscriber_port = 5588 + +[node] + +[node.network] +rpc_port = 9828 +websocket_port = 8028 +listen_address = "/ip4/127.0.0.1/tcp/7003" +announce_addresses = [ + "/ip4/127.0.0.1/tcp/7003/p2p/12D3KooWJWoaqZhDaoEFshF7Rh1bpY9ohihFhzcW6d69Lr2NASuq", +] +node_addresses = [ + "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", +] +rendezvous_registration_ttl = 1 +enable_mdns = false + +# Peer ID 12D3KooWJWoaqZhDaoEFshF7Rh1bpY9ohihFhzcW6d69Lr2NASuq +[node.network.keypair_config] +existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519_3.pem" } diff --git a/homestar-runtime/tests/fixtures/test_rendezvous5.toml b/homestar-runtime/tests/fixtures/test_rendezvous5.toml new file mode 100644 index 00000000..876bf37d --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_rendezvous5.toml @@ -0,0 +1,20 @@ +[monitoring] +process_collector_interval = 500 +metrics_port = 4039 +console_subscriber_port = 5589 + +[node] + +[node.network] +rpc_port = 9829 +websocket_port = 8029 +listen_address = "/ip4/127.0.0.1/tcp/7004" +node_addresses = [ + "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", +] +rendezvous_discovery_interval = 1 +enable_mdns = false + +# Peer ID 12D3KooWRndVhVZPCiQwHBBBdg769GyrPUW13zxwqQyf9r3ANaba +[node.network.keypair_config] +existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519_4.pem" } diff --git a/homestar-runtime/tests/fixtures/test_rendezvous6.toml b/homestar-runtime/tests/fixtures/test_rendezvous6.toml new file mode 100644 index 00000000..8b0b5ed2 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_rendezvous6.toml @@ -0,0 +1,23 @@ +[monitoring] +process_collector_interval = 500 +metrics_port = 4038 +console_subscriber_port = 5588 + +[node] + +[node.network] +rpc_port = 9828 +websocket_port = 8028 +listen_address = "/ip4/127.0.0.1/tcp/7005" +announce_addresses = [ + "/ip4/127.0.0.1/tcp/7005/p2p/12D3KooWPT98FXMfDQYavZm66EeVjTqP9Nnehn1gyaydqV8L8BQw", +] +node_addresses = [ + "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", +] +rendezvous_registration_ttl = 5 +enable_mdns = false + +# Peer ID 12D3KooWPT98FXMfDQYavZm66EeVjTqP9Nnehn1gyaydqV8L8BQw +[node.network.keypair_config] +existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519_5.pem" } diff --git a/homestar-runtime/tests/fixtures/test_v4.toml b/homestar-runtime/tests/fixtures/test_v4.toml index 86d4a97c..7748c44e 100644 --- a/homestar-runtime/tests/fixtures/test_v4.toml +++ b/homestar-runtime/tests/fixtures/test_v4.toml @@ -1,11 +1,11 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4040 -console_subscriber_port = 5590 +metrics_port = 4045 +console_subscriber_port = 5595 [node] [node.network] events_buffer_len = 1000 -rpc_port = 9830 +rpc_port = 9835 rpc_host = "127.0.0.1" diff --git a/homestar-runtime/tests/fixtures/test_v4_alt.toml b/homestar-runtime/tests/fixtures/test_v4_alt.toml index d8088378..d9ccc9c6 100644 --- a/homestar-runtime/tests/fixtures/test_v4_alt.toml +++ b/homestar-runtime/tests/fixtures/test_v4_alt.toml @@ -1,11 +1,11 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4041 -console_subscriber_port = 5591 +metrics_port = 4046 +console_subscriber_port = 5596 [node] [node.network] events_buffer_len = 1000 -rpc_port = 9831 +rpc_port = 9836 rpc_host = "127.0.0.1" diff --git a/homestar-runtime/tests/fixtures/test_v6.toml b/homestar-runtime/tests/fixtures/test_v6.toml index 07711b16..7a4a15e4 100644 --- a/homestar-runtime/tests/fixtures/test_v6.toml +++ b/homestar-runtime/tests/fixtures/test_v6.toml @@ -1,12 +1,12 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4042 -console_subscriber_port = 5592 +metrics_port = 4047 +console_subscriber_port = 5597 [node] [node.network] events_buffer_len = 1000 -rpc_port = 9832 +rpc_port = 9837 rpc_host = "::1" diff --git a/homestar-runtime/tests/network.rs b/homestar-runtime/tests/network.rs index 27fce6df..8a4a346c 100644 --- a/homestar-runtime/tests/network.rs +++ b/homestar-runtime/tests/network.rs @@ -1,10 +1,13 @@ -use crate::utils::{check_lines_for, kill_homestar, retrieve_output, stop_homestar, BIN_NAME}; +use crate::utils::{ + check_lines_for, count_lines_where, kill_homestar, retrieve_output, stop_homestar, BIN_NAME, +}; use anyhow::Result; use once_cell::sync::Lazy; use serial_test::file_serial; use std::{ path::PathBuf, process::{Command, Stdio}, + thread, time::Duration, }; @@ -337,6 +340,124 @@ fn test_libp2p_connect_after_mdns_discovery_serial() -> Result<()> { Ok(()) } +#[test] +#[file_serial] +fn test_libp2p_connect_rendezvous_discovery_serial() -> Result<()> { + let _ = stop_homestar(); + + // Start a rendezvous server + let rendezvous_server = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Start a peer that will register with the rendezvous server + let rendezvous_client1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous2.toml") + .arg("--db") + .arg("homestar2.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Wait for registration to complete + // TODO When we have websocket push events, listen on a registration event instead of using an arbitrary sleep + thread::sleep(Duration::from_secs(2)); + + // Start a peer that will discover the registrant through the rendezvous server + let rendezvous_client2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous3.toml") + .arg("--db") + .arg("homestar3.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Collect logs for five seconds then kill proceses. + let dead_server = kill_homestar(rendezvous_server, Some(Duration::from_secs(5))); + let _ = kill_homestar(rendezvous_client1, Some(Duration::from_secs(5))); + let dead_client2 = kill_homestar(rendezvous_client2, Some(Duration::from_secs(5))); + + // Retrieve logs. + let stdout_server = retrieve_output(dead_server); + let stdout_client2 = retrieve_output(dead_client2); + + // Check rendezvous server registered the client one + let registered_client_one = check_lines_for( + stdout_server.clone(), + vec![ + "registered peer through rendezvous", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + // Check rendezvous served a discover request to client two + let served_discovery_to_client_two = check_lines_for( + stdout_server.clone(), + vec![ + "served rendezvous discover request to peer", + "12D3KooWK99VoVxNE7XzyBwXEzW7xhK7Gpv85r9F3V3fyKSUKPH5", + ], + ); + + assert!(registered_client_one); + assert!(served_discovery_to_client_two); + + // Check that client two connected to client one. + let two_connected_to_one = check_lines_for( + stdout_client2.clone(), + vec![ + "peer connection established", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + // Check client one was added to the Kademlia table + let one_addded_to_dht = check_lines_for( + stdout_client2.clone(), + vec![ + "added identified node to kademlia routing table", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + // Check that DHT routing table was updated with client one + let one_in_dht_routing_table = check_lines_for( + stdout_client2.clone(), + vec![ + "kademlia routing table updated with peer", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + assert!(one_addded_to_dht); + assert!(one_in_dht_routing_table); + assert!(two_connected_to_one); + + Ok(()) +} + #[test] #[file_serial] fn test_libp2p_disconnect_mdns_discovery_serial() -> Result<()> { @@ -472,3 +593,315 @@ fn test_libp2p_disconnect_known_peers_serial() -> Result<()> { Ok(()) } + +#[test] +#[file_serial] +fn test_libp2p_disconnect_rendezvous_discovery_serial() -> Result<()> { + let _ = stop_homestar(); + + // Start a rendezvous server + let rendezvous_server = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Start a peer that will register with the rendezvous server + let rendezvous_client1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous2.toml") + .arg("--db") + .arg("homestar2.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Wait for registration to complete. + // TODO When we have websocket push events, listen on a registration event instead of using an arbitrary sleep. + thread::sleep(Duration::from_secs(2)); + + // Start a peer that will discover the registrant through the rendezvous server + let rendezvous_client2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous3.toml") + .arg("--db") + .arg("homestar3.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Kill server and client one after five seconds + let _ = kill_homestar(rendezvous_server, Some(Duration::from_secs(5))); + let _ = kill_homestar(rendezvous_client1, Some(Duration::from_secs(5))); + + // Collect logs for seven seconds then kill process. + let dead_client2 = kill_homestar(rendezvous_client2, Some(Duration::from_secs(7))); + + // Retrieve logs. + let stdout = retrieve_output(dead_client2); + + // Check that client two disconnected from client one. + let two_disconnected_from_one = check_lines_for( + stdout.clone(), + vec![ + "peer connection closed", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + // Check that client two was removed from the Kademlia table + let two_removed_from_dht_table = check_lines_for( + stdout.clone(), + vec![ + "removed peer from kademlia table", + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", + ], + ); + + assert!(two_disconnected_from_one); + assert!(two_removed_from_dht_table); + + Ok(()) +} + +#[test] +#[file_serial] +fn test_libp2p_rendezvous_renew_registration_serial() -> Result<()> { + let _ = stop_homestar(); + + // Start a rendezvous server + let rendezvous_server = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Start a peer that will renew registrations with the rendezvous server once per second + let rendezvous_client1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous4.toml") + .arg("--db") + .arg("homestar4.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Collect logs for five seconds then kill proceses. + let dead_server = kill_homestar(rendezvous_server, Some(Duration::from_secs(5))); + let dead_client = kill_homestar(rendezvous_client1, Some(Duration::from_secs(5))); + + // Retrieve logs. + let stdout_server = retrieve_output(dead_server); + let stdout_client = retrieve_output(dead_client); + + // Count registrations on the server + let server_registration_count = count_lines_where( + stdout_server, + vec![ + "registered peer through rendezvous", + "12D3KooWJWoaqZhDaoEFshF7Rh1bpY9ohihFhzcW6d69Lr2NASuq", + ], + ); + + // Count registrations on the client + let client_registration_count = count_lines_where( + stdout_client, + vec![ + "registered self with rendezvous node", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + assert!(server_registration_count > 1); + assert!(client_registration_count > 1); + + Ok(()) +} + +#[test] +#[file_serial] +fn test_libp2p_rendezvous_rediscovery_serial() -> Result<()> { + let _ = stop_homestar(); + + // Start a rendezvous server + let rendezvous_server = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Start a peer that will discover with the rendezvous server once per second + let rendezvous_client1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous5.toml") + .arg("--db") + .arg("homestar5.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Collect logs for five seconds then kill proceses. + let dead_server = kill_homestar(rendezvous_server, Some(Duration::from_secs(5))); + let dead_client = kill_homestar(rendezvous_client1, Some(Duration::from_secs(5))); + + // Retrieve logs. + let stdout_server = retrieve_output(dead_server); + let stdout_client = retrieve_output(dead_client); + + // Count discover requests on the server + let server_discovery_count = count_lines_where( + stdout_server, + vec![ + "served rendezvous discover request to peer", + "12D3KooWRndVhVZPCiQwHBBBdg769GyrPUW13zxwqQyf9r3ANaba", + ], + ); + + // Count discovery responses the client + let client_discovery_count = count_lines_where( + stdout_client, + vec![ + "received discovery from rendezvous server", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + assert!(server_discovery_count > 1); + assert!(client_discovery_count > 1); + + Ok(()) +} + +#[test] +#[file_serial] +fn test_libp2p_rendezvous_rediscover_on_expiration_serial() -> Result<()> { + let _ = stop_homestar(); + + // Start a rendezvous server + let rendezvous_server = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Start a peer that will renew registrations with the rendezvous server every five seconds + let rendezvous_client1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous6.toml") + .arg("--db") + .arg("homestar6.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Wait for registration to complete. + // TODO When we have websocket push events, listen on a registration event instead of using an arbitrary sleep. + thread::sleep(Duration::from_secs(2)); + + // Start a peer that will discover with the rendezvous server when + // a discovered registration expires. Note that by default discovery only + // occurs every ten minutes, so discovery requests in this test are driven + // by expirations. + let rendezvous_client2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_rendezvous3.toml") + .arg("--db") + .arg("homestar3.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Collect logs for seven seconds then kill proceses. + let dead_server = kill_homestar(rendezvous_server, Some(Duration::from_secs(7))); + let _ = kill_homestar(rendezvous_client1, Some(Duration::from_secs(7))); + let dead_client2 = kill_homestar(rendezvous_client2, Some(Duration::from_secs(7))); + + // Retrieve logs. + let stdout_server = retrieve_output(dead_server); + let stdout_client2 = retrieve_output(dead_client2); + + // Count discover requests on the server + let server_discovery_count = count_lines_where( + stdout_server, + vec![ + "served rendezvous discover request to peer", + "12D3KooWK99VoVxNE7XzyBwXEzW7xhK7Gpv85r9F3V3fyKSUKPH5", + ], + ); + + // Count discovery responses the client + let client_discovery_count = count_lines_where( + stdout_client2, + vec![ + "received discovery from rendezvous server", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + assert!(server_discovery_count > 1); + assert!(client_discovery_count > 1); + + Ok(()) +} diff --git a/homestar-runtime/tests/utils.rs b/homestar-runtime/tests/utils.rs index 57c8711a..7c754b94 100644 --- a/homestar-runtime/tests/utils.rs +++ b/homestar-runtime/tests/utils.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; #[cfg(not(windows))] use assert_cmd::prelude::*; +use chrono::{DateTime, FixedOffset}; #[cfg(not(windows))] use nix::{ sys::signal::{self, Signal}, @@ -95,16 +96,61 @@ pub(crate) fn retrieve_output(proc: Child) -> String { pub(crate) fn check_lines_for(output: String, predicates: Vec<&str>) -> bool { output .split("\n") - .map(|line| { - // Line contains all predicates - predicates - .iter() - .map(|pred| predicate::str::contains(*pred).eval(line)) - .fold(true, |acc, curr| acc && curr) - }) + .map(|line| line_contains(line, &predicates)) .fold(false, |acc, curr| acc || curr) } +pub(crate) fn count_lines_where(output: String, predicates: Vec<&str>) -> i32 { + output.split("\n").fold(0, |count, line| { + if line_contains(line, &predicates) { + count + 1 + } else { + count + } + }) +} + +/// Extract timestamps for process output lines with matching predicates +#[allow(dead_code)] +pub(crate) fn extract_timestamps_where( + output: String, + predicates: Vec<&str>, +) -> Vec> { + output.split("\n").fold(vec![], |mut timestamps, line| { + if line_contains(line, &predicates) { + match extract_label(&line, "ts").and_then(|val| DateTime::parse_from_rfc3339(val).ok()) + { + Some(datetime) => { + timestamps.push(datetime); + timestamps + } + None => { + println!("Encountered a log entry that was missing a timestamp label: {line}"); + timestamps + } + } + } else { + timestamps + } + }) +} + +/// Check process output line for all predicates +fn line_contains(line: &str, predicates: &Vec<&str>) -> bool { + predicates + .iter() + .map(|pred| predicate::str::contains(*pred).eval(line)) + .fold(true, |acc, curr| acc && curr) +} + +/// Extract label value from process output line +#[allow(dead_code)] +fn extract_label<'a>(line: &'a str, key: &str) -> Option<&'a str> { + line.split(' ') + .find(|label| predicate::str::contains(format!("{key}=")).eval(label)) + .and_then(|label| label.split('=').next_back()) +} + /// Wait for process to exit or kill after timeout. pub(crate) fn kill_homestar(mut homestar_proc: Child, timeout: Option) -> Child { if let Ok(None) = homestar_proc.try_wait() {