diff --git a/Cargo.lock b/Cargo.lock index 331d1c3ef0..95227a4017 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -190,18 +190,6 @@ dependencies = [ "syn 2.0.89", ] -[[package]] -name = "async-channel" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-recursion" version = "1.1.1" @@ -364,23 +352,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "bao-tree" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f7a89a8ee5889d2593ae422ce6e1bb03e48a0e8a16e4fa0882dfcbe7e182ef" -dependencies = [ - "bytes", - "futures-lite 2.5.0", - "genawaiter", - "iroh-blake3", - "iroh-io", - "positioned-io", - "range-collections", - "self_cell", - "smallvec", -] - [[package]] name = "base16ct" version = "0.2.0" @@ -408,21 +379,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "binary-merge" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597bb81c80a54b6a4381b23faba8d7774b144c94cbd1d6fe3f1329bd776554ab" - -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" -dependencies = [ - "serde", -] - [[package]] name = "bit-set" version = "0.5.3" @@ -483,37 +439,6 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" -[[package]] -name = "camino" -version = "1.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo-platform" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo_metadata" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" -dependencies = [ - "camino", - "cargo-platform", - "semver", - "serde", - "serde_json", -] - [[package]] name = "cast" version = "0.3.0" @@ -1401,27 +1326,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "event-listener" -version = "5.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" -dependencies = [ - "event-listener 5.3.1", - "pin-project-lite", -] - [[package]] name = "fallible-iterator" version = "0.3.0" @@ -2465,15 +2369,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "inplace-vec-builder" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf64c2edc8226891a71f127587a2861b132d2b942310843814d5001d99a1d307" -dependencies = [ - "smallvec", -] - [[package]] name = "instant" version = "0.1.13" @@ -2509,52 +2404,17 @@ name = "iroh" version = "0.28.1" dependencies = [ "anyhow", - "async-channel", - "bao-tree", "bytes", "clap", - "derive_more", - "futures-buffered", "futures-lite 2.5.0", - "futures-util", - "genawaiter", "indicatif", - "iroh", "iroh-base", - "iroh-io", - "iroh-metrics", "iroh-net", - "iroh-node-util", - "iroh-quinn", - "iroh-relay", "iroh-router", - "iroh-test", - "nested_enum_utils", - "num_cpus", - "parking_lot", "parse-size", - "postcard", - "proptest", - "quic-rpc", - "quic-rpc-derive", - "rand", - "rand_chacha", - "ref-cast", - "regex", - "serde", - "serde-error", - "serde_json", - "strum", - "tempfile", - "testdir", - "testresult", - "thiserror 2.0.3", "tokio", - "tokio-stream", - "tokio-util", "tracing", "tracing-subscriber", - "url", ] [[package]] @@ -2648,19 +2508,6 @@ dependencies = [ "z32", ] -[[package]] -name = "iroh-io" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17e302c5ad649c6a7aa9ae8468e1c4dc2469321af0c6de7341c1be1bdaab434b" -dependencies = [ - "bytes", - "futures-lite 2.5.0", - "pin-project", - "smallvec", - "tokio", -] - [[package]] name = "iroh-metrics" version = "0.28.0" @@ -2694,7 +2541,6 @@ dependencies = [ "crypto_box", "der", "derive_more", - "duct", "futures-buffered", "futures-concurrency", "futures-lite 2.5.0", @@ -3468,15 +3314,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" -[[package]] -name = "ntapi" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" -dependencies = [ - "winapi", -] - [[package]] name = "ntest" version = "0.9.3" @@ -3583,16 +3420,6 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi 0.3.9", - "libc", -] - [[package]] name = "num_enum" version = "0.7.3" @@ -4032,16 +3859,6 @@ dependencies = [ "url", ] -[[package]] -name = "positioned-io" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccabfeeb89c73adf4081f0dca7f8e28dbda90981a222ceea37f619e93ea6afe9" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "postcard" version = "1.0.10" @@ -4259,7 +4076,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc623a188942fc875926f7baeb2cb08ed4288b64f29072656eb051e360ee7623" dependencies = [ "anyhow", - "bincode", "derive_more", "educe", "flume", @@ -4267,12 +4083,10 @@ dependencies = [ "futures-sink", "futures-util", "hex", - "iroh-quinn", "pin-project", "serde", "slab", "tokio", - "tokio-serde", "tokio-util", "tracing", ] @@ -4415,18 +4229,6 @@ dependencies = [ "rand_core", ] -[[package]] -name = "range-collections" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca9edd21e2db51000ac63eccddabba622f826e631a60be7bade9bd6a76b69537" -dependencies = [ - "binary-merge", - "inplace-vec-builder", - "ref-cast", - "smallvec", -] - [[package]] name = "raw-cpuid" version = "11.2.0" @@ -4498,26 +4300,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "ref-cast" -version = "1.0.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccf0a6f84d5f1d581da8b41b47ec8600871962f2a528115b542b362d4b744931" -dependencies = [ - "ref-cast-impl", -] - -[[package]] -name = "ref-cast-impl" -version = "1.0.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.89", -] - [[package]] name = "regex" version = "1.11.1" @@ -4964,9 +4746,6 @@ name = "semver" version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" -dependencies = [ - "serde", -] [[package]] name = "serde" @@ -5479,20 +5258,6 @@ dependencies = [ "syn 2.0.89", ] -[[package]] -name = "sysinfo" -version = "0.26.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c18a6156d1f27a9592ee18c1a846ca8dd5c258b7179fc193ae87c74ebb666f5" -dependencies = [ - "cfg-if", - "core-foundation-sys", - "libc", - "ntapi", - "once_cell", - "winapi", -] - [[package]] name = "system-configuration" version = "0.6.1" @@ -5527,20 +5292,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "testdir" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee79e927b64d193f5abb60d20a0eb56be0ee5a242fdeb8ce3bf054177006de52" -dependencies = [ - "anyhow", - "backtrace", - "cargo_metadata", - "once_cell", - "sysinfo", - "whoami", -] - [[package]] name = "testresult" version = "0.4.1" @@ -5734,21 +5485,6 @@ dependencies = [ "x509-parser", ] -[[package]] -name = "tokio-serde" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466" -dependencies = [ - "bincode", - "bytes", - "educe", - "futures-core", - "futures-sink", - "pin-project", - "serde", -] - [[package]] name = "tokio-stream" version = "0.1.16" @@ -5814,7 +5550,6 @@ dependencies = [ "futures-util", "hashbrown 0.14.5", "pin-project-lite", - "slab", "tokio", ] @@ -6245,12 +5980,6 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" -[[package]] -name = "wasite" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" - [[package]] name = "wasm-bindgen" version = "0.2.95" @@ -6324,7 +6053,7 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45b42a2f611916b5965120a9cde2b60f2db4454826dd9ad5e6f47c24a5b3b259" dependencies = [ - "event-listener 4.0.3", + "event-listener", "futures-util", "parking_lot", "thiserror 1.0.69", @@ -6359,17 +6088,6 @@ dependencies = [ "rustls-pki-types", ] -[[package]] -name = "whoami" -version = "1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" -dependencies = [ - "redox_syscall", - "wasite", - "web-sys", -] - [[package]] name = "widestring" version = "1.1.0" diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 12776a7a91..7e4ed8836c 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -172,9 +172,6 @@ iroh-relay = { version = "0.28", path = "../iroh-relay", features = ["test-utils name = "key" harness = false -[build-dependencies] -duct = "0.13.6" - [features] default = ["metrics", "discovery-pkarr-dht"] metrics = ["iroh-metrics/metrics"] diff --git a/iroh/src/util/fs.rs b/iroh-node-util/src/fs.rs similarity index 99% rename from iroh/src/util/fs.rs rename to iroh-node-util/src/fs.rs index d16d07be8e..a773797085 100644 --- a/iroh/src/util/fs.rs +++ b/iroh-node-util/src/fs.rs @@ -1,4 +1,5 @@ //! Utilities for filesystem operations. + use std::path::PathBuf; use anyhow::Context; diff --git a/iroh-node-util/src/lib.rs b/iroh-node-util/src/lib.rs index 0f21b059f1..709b430bad 100644 --- a/iroh-node-util/src/lib.rs +++ b/iroh-node-util/src/lib.rs @@ -12,6 +12,8 @@ pub mod config; pub mod logging; pub mod rpc; +pub mod fs; + use std::path::PathBuf; use anyhow::Context; diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index cb2818980a..33c797151c 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -16,92 +16,44 @@ rust-version = "1.76" workspace = true [dependencies] -anyhow = { version = "1" } -async-channel = "2.3.1" -bytes = "1.7" -derive_more = { version = "1.0.0", features = [ - "debug", - "display", - "from", - "try_into", - "from_str", -] } -futures-lite = "2.5" -futures-util = "0.3" iroh-base = { version = "0.28.0", features = ["key"] } -iroh-io = { version = "0.6.0", features = ["stats"] } -iroh-metrics = { version = "0.28.0", optional = true } -iroh-net = { version = "0.28.1", features = ["discovery-local-network"] } -iroh-node-util = { version = "0.28.0", path = "../iroh-node-util" } +iroh-net = { version = "0.28.1", default-features = false } iroh-router = { version = "0.28.0" } -nested_enum_utils = "0.1.0" -num_cpus = { version = "1.15.0" } -parking_lot = "0.12.1" -postcard = { version = "1", default-features = false, features = [ - "alloc", - "use-std", - "experimental-derive", -] } -quic-rpc = { version = "0.15", default-features = false, features = [ - "flume-transport", - "quinn-transport", -] } -quic-rpc-derive = { version = "0.15" } -quinn = { package = "iroh-quinn", version = "0.12" } -serde = { version = "1", features = ["derive"] } -strum = { version = "0.26", features = ["derive"] } -thiserror = "2" -tempfile = "3.4" -tokio = { version = "1", features = ["io-util", "rt"] } -tokio-util = { version = "0.7", features = ["codec", "io-util", "io", "time"] } -tracing = "0.1" -iroh-relay = { version = "0.28", path = "../iroh-relay" } -ref-cast = "1.0.23" # Examples +anyhow = { version = "1", optional = true } clap = { version = "4", features = ["derive"], optional = true } indicatif = { version = "0.17", features = ["tokio"], optional = true } parse-size = { version = "=1.0.0", optional = true } # pinned version to avoid bumping msrv to 1.81 +tokio = { version = "1", features = ["full"], optional = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true } +futures-lite = { version = "2.5", optional = true } +tracing = { version = "0.1", optional = true } +bytes = { version = "1.8", optional = true } -# Documentation tests -url = { version = "2.5", features = ["serde"] } -serde-error = "0.1.3" [features] -default = ["metrics"] -metrics = ["iroh-metrics"] -test = [] +default = ["metrics", "discovery-pkarr-dht"] +metrics = ["iroh-net/metrics"] +discovery-local-network = ["iroh-net/discovery-local-network"] discovery-pkarr-dht = ["iroh-net/discovery-pkarr-dht"] -test-utils = ["iroh-net/test-utils"] -examples = ["dep:clap", "dep:indicatif", "dep:parse-size"] - -[dev-dependencies] -anyhow = { version = "1" } -bao-tree = { version = "0.13", features = ["tokio_fsm"] } -futures-buffered = "0.2.4" -genawaiter = { version = "0.99", features = ["futures03"] } -iroh = { path = ".", features = ["test-utils"] } -iroh-test = "0.28.0" -proptest = "1.2.0" -rand = "0.8" -rand_chacha = "0.3.1" -regex = { version = "1.7.1", features = ["std"] } -serde_json = "1" -testdir = "0.9.1" -testresult = "0.4.0" -tokio = { version = "1", features = ["macros", "io-util", "rt"] } -tokio-stream = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +examples = [ + "dep:anyhow", + "dep:clap", + "dep:indicatif", + "dep:parse-size", + "dep:tokio", + "dep:tracing-subscriber", + "dep:futures-lite", + "dep:tracing", + "dep:bytes", +] [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "iroh_docsrs"] -[[example]] -name = "rpc" -required-features = ["examples"] - [[example]] name = "transfer" required-features = ["examples"] diff --git a/iroh/examples/rpc.rs b/iroh/examples/rpc.rs deleted file mode 100644 index 4eb3bac6a1..0000000000 --- a/iroh/examples/rpc.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! An example that runs an iroh node that can be controlled via RPC. -//! -//! Run this example with -//! $ cargo run --features=examples --example rpc -//! This will print the rpc address of the node. Copy it to use it to connect from the CLI. -//! Then in another terminal, run any of the normal iroh CLI commands supplying the rpc address, -//! which you can run from cargo as well, -//! $ cargo run -- --rpc-addr net node-addr -//! The `net node-addr` command will reach out over RPC to the node constructed in the example. - -use clap::Parser; -use tracing_subscriber::{prelude::*, EnvFilter}; - -// set the RUST_LOG env var to one of {debug,info,warn} to see logging info -pub fn setup_logging() { - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) - .with(EnvFilter::from_default_env()) - .try_init() - .ok(); -} - -async fn run(builder: iroh::node::Builder) -> anyhow::Result<()> { - let node = builder - .enable_rpc() - .await? // enable the RPC endpoint - .spawn() - .await?; - - // print some info about the node - let peer = node.node_id(); - let addrs = node.local_endpoint_addresses().await?; - println!("node PeerID: {peer}"); - println!("node listening addresses:"); - for addr in addrs { - println!(" {}", addr); - } - let rpc_addr = node.my_rpc_addr().expect("rpc enabled"); - println!("Started node with RPC enabled ({rpc_addr}). Exit with Ctrl+C"); - // wait for the node to finish, this will block indefinitely - // stop with SIGINT (ctrl+c) - tokio::signal::ctrl_c().await?; - node.shutdown().await?; - - Ok(()) -} - -#[derive(Parser, Debug)] -struct Args { - /// Path to use to store the iroh database. - /// - /// If this is not set, an in memory database will be used. - #[clap(long)] - path: Option, -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - setup_logging(); - - let args = Args::parse(); - match args.path { - Some(path) => { - tokio::fs::create_dir_all(&path).await?; - let builder = iroh::node::Node::persistent(path).await?; - run(builder).await - } - None => { - let builder = iroh::node::Node::memory(); - run(builder).await - } - } -} diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 4a9025cb1c..c1ca5c90e2 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -9,7 +9,8 @@ use clap::{Parser, Subcommand}; use futures_lite::StreamExt; use indicatif::HumanBytes; use iroh_net::{ - key::SecretKey, ticket::NodeTicket, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, + endpoint::ConnectionError, key::SecretKey, ticket::NodeTicket, Endpoint, NodeAddr, RelayMap, + RelayMode, RelayUrl, }; use tracing::info; @@ -132,7 +133,7 @@ async fn provide(size: u64, relay_url: Option) -> anyhow::Result<()> { // it received this message. let res = tokio::time::timeout(Duration::from_secs(3), async move { let closed = conn.closed().await; - if !matches!(closed, quinn::ConnectionError::ApplicationClosed(_)) { + if !matches!(closed, ConnectionError::ApplicationClosed(_)) { println!("node {node_id} disconnected with an error: {closed:#}"); } }) diff --git a/iroh/src/client.rs b/iroh/src/client.rs deleted file mode 100644 index 2b9a752d4e..0000000000 --- a/iroh/src/client.rs +++ /dev/null @@ -1,59 +0,0 @@ -//! Client to an Iroh node. -//! -//! See the documentation for [`Iroh`] for more information. -#[doc(inline)] -pub use crate::rpc_protocol::RpcService; - -mod quic; - -pub use iroh_node_util::rpc::client::{net, node}; - -pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; - -// Keep this type exposed, otherwise every occurrence of `RpcClient` in the API -// will show up as `RpcClient>` in the docs. -/// Iroh rpc client - boxed so that we can have a concrete type. -pub type RpcClient = quic_rpc::RpcClient; - -/// An iroh client. -/// -/// There are three ways to obtain this client, depending on which context -/// you're running in relative to the main [`Node`](crate::node::Node): -/// -/// 1. If you just spawned the client in rust the same process and have a reference to it: -/// Use [`Node::client()`](crate::node::Node::client). -/// 2. If the main node wasn't spawned in the same process, but on the same machine: -/// Use [`Iroh::connect_path`]. -/// 3. If the main node was spawned somewhere else and has been made accessible via IP: -/// Use [`Iroh::connect_addr`]. -#[derive(Debug, Clone)] -pub struct Iroh { - rpc: RpcClient, -} - -impl Iroh { - /// Creates a new high-level client to a Iroh node from the low-level RPC client. - /// - /// Prefer using [`Node::client()`](crate::node::Node::client), [`Iroh::connect_path`] - /// or [`Iroh::connect_addr`] instead of calling this function. - /// - /// See also the [`Iroh`] struct documentation. - pub fn new(rpc: RpcClient) -> Self { - Self { rpc } - } - - /// Returns the actual [`RpcClient`]. - pub fn client(&self) -> RpcClient { - self.rpc.clone() - } - - /// Returns the net client. - pub fn net(&self) -> net::Client { - net::Client::new(self.rpc.clone().map().boxed()) - } - - /// Returns the net client. - pub fn node(&self) -> node::Client { - node::Client::new(self.rpc.clone().map().boxed()) - } -} diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs deleted file mode 100644 index a00b101de5..0000000000 --- a/iroh/src/client/quic.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! Type declarations and utility functions for an RPC client to an iroh node running in a separate process. - -use std::{ - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, - path::Path, - sync::Arc, - time::Duration, -}; - -use anyhow::{bail, Context}; -use iroh_node_util::rpc::proto::node::StatusRequest; -use quic_rpc::transport::{boxed::BoxedConnector, quinn::QuinnConnector}; - -use super::{Iroh, RpcClient}; -use crate::node::RpcStatus; - -/// ALPN used by irohs RPC mechanism. -// TODO: Change to "/iroh-rpc/1" -pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1"; - -impl Iroh { - /// Connects to an iroh node running on the same computer, but in a different process. - pub async fn connect_path(root: impl AsRef) -> anyhow::Result { - let rpc_status = RpcStatus::load(root).await?; - match rpc_status { - RpcStatus::Stopped => { - bail!("iroh is not running, please start it"); - } - RpcStatus::Running { client, port: _ } => Ok(Iroh::new(client)), - } - } - - /// Connects to an iroh node at the given RPC address. - pub async fn connect_addr(addr: SocketAddr) -> anyhow::Result { - let client = connect_raw(addr).await?; - Ok(Iroh::new(client)) - } -} - -/// Create a raw RPC client to an iroh node running on the same computer, but in a different -/// process. -pub(crate) async fn connect_raw(addr: SocketAddr) -> anyhow::Result { - let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into(); - let endpoint = create_quinn_client(bind_addr, vec![RPC_ALPN.to_vec()], false)?; - - let server_name = "localhost".to_string(); - let connection = QuinnConnector::new(endpoint, addr, server_name); - let connection = BoxedConnector::new(connection); - let client = RpcClient::new(connection); - let node_client = client - .clone() - .map::(); - // Do a status request to check if the server is running. - let _version = tokio::time::timeout(Duration::from_secs(1), node_client.rpc(StatusRequest)) - .await - .context("Iroh node is not running")??; - Ok(client) -} - -fn create_quinn_client( - bind_addr: SocketAddr, - alpn_protocols: Vec>, - keylog: bool, -) -> anyhow::Result { - let secret_key = iroh_net::key::SecretKey::generate(); - let tls_client_config = - iroh_net::tls::make_client_config(&secret_key, None, alpn_protocols, keylog)?; - let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); - let mut endpoint = quinn::Endpoint::client(bind_addr)?; - let mut transport_config = quinn::TransportConfig::default(); - transport_config.keep_alive_interval(Some(Duration::from_secs(1))); - client_config.transport_config(Arc::new(transport_config)); - endpoint.set_default_client_config(client_config); - Ok(endpoint) -} diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 4b21148ea3..c1fae16f90 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -1,63 +1,12 @@ //! Send data over the internet. //! -//! # Getting started -//! -//! ## Example -//! -//! Create a new node. -//! -//! ```rust -//! # async fn run() -> anyhow::Result<()> { -//! let _node = iroh::node::Node::memory().spawn().await?; -//! # Ok(()) -//! # } -//! ``` -//! -//! ## Explanation -//! -//! ### Iroh node -//! -//! To create an iroh [Node](crate::node::Node), you use the -//! [Builder](crate::node::Builder) to configure the node and to spawn it. -//! -//! There are also shortcuts to create an in [memory](crate::node::Node::memory) -//! or [persistent](crate::node::Node::persistent) node with default settings. -//! -//! ## Iroh client -//! -//! A node is controlled via a **client**. The client provides the main API to -//! interact with a node, no matter if it is a local in-process node or a node -//! in a different process. All clients are cheaply cloneable and can be shared -//! across threads. -//! -//! A handle to the client is available via the -//! [client](crate::node::Node::client) method on the node. -//! -//! Node also implements [Deref](std::ops::Deref) to the client, so you can call -//! client methods directly on the node. -//! -//! ## Subsystems -//! -//! The client provides access to various subsystems: -//! - [net](crate::client::net): -//! information and control of the iroh network -//! -//! The subsystem clients can be obtained cheaply from the main iroh client. -//! They are also cheaply cloneable and can be shared across threads. -//! -//! So if you have code that only needs to interact with one subsystem, pass -//! it just the subsystem client. -//! //! //! ## Reexports //! //! The iroh crate re-exports the following crates: //! - [iroh_base] as [`base`] //! - [iroh_net] as [`net`] -//! -//! ## Feature Flags -//! -//! - `metrics`: Enable metrics collection. Enabled by default. +//! - [iroh_router] as [`router`] #![cfg_attr(iroh_docsrs, feature(doc_cfg))] #![deny(missing_docs, rustdoc::broken_intra_doc_links)] @@ -68,14 +17,3 @@ pub use iroh_base as base; pub use iroh_net as net; #[doc(inline)] pub use iroh_router as router; - -pub mod client; -pub mod node; -pub mod util; - -mod rpc_protocol; - -/// Expose metrics module -#[cfg(feature = "metrics")] -#[cfg_attr(iroh_docsrs, doc(cfg(feature = "metrics")))] -pub mod metrics; diff --git a/iroh/src/metrics.rs b/iroh/src/metrics.rs deleted file mode 100644 index 7ce6a65f72..0000000000 --- a/iroh/src/metrics.rs +++ /dev/null @@ -1,112 +0,0 @@ -use std::collections::BTreeMap; - -use iroh_metrics::{ - core::{Counter, Metric}, - struct_iterable::Iterable, -}; -use iroh_node_util::rpc::proto::node::CounterStats; - -/// Enum of metrics for the module -#[allow(missing_docs)] -#[derive(Debug, Clone, Iterable)] -pub struct Metrics { - pub doc_gossip_tick_main: Counter, - pub doc_gossip_tick_event: Counter, - pub doc_gossip_tick_actor: Counter, - pub doc_gossip_tick_pending_join: Counter, - - pub doc_live_tick_main: Counter, - pub doc_live_tick_actor: Counter, - pub doc_live_tick_replica_event: Counter, - pub doc_live_tick_running_sync_connect: Counter, - pub doc_live_tick_running_sync_accept: Counter, - pub doc_live_tick_pending_downloads: Counter, -} - -impl Default for Metrics { - fn default() -> Self { - Self { - doc_gossip_tick_main: Counter::new("Number of times the main gossip actor loop ticked"), - doc_gossip_tick_event: Counter::new( - "Number of times the gossip actor ticked for an event", - ), - doc_gossip_tick_actor: Counter::new( - "Number of times the gossip actor ticked for an actor message", - ), - doc_gossip_tick_pending_join: Counter::new( - "Number of times the gossip actor ticked pending join", - ), - - doc_live_tick_main: Counter::new("Number of times the main live actor loop ticked"), - doc_live_tick_actor: Counter::new( - "Number of times the live actor ticked for an actor message", - ), - doc_live_tick_replica_event: Counter::new( - "Number of times the live actor ticked for a replica event", - ), - doc_live_tick_running_sync_connect: Counter::new( - "Number of times the live actor ticked for a running sync connect", - ), - doc_live_tick_running_sync_accept: Counter::new( - "Number of times the live actor ticked for a running sync accept", - ), - doc_live_tick_pending_downloads: Counter::new( - "Number of times the live actor ticked for a pending download", - ), - } - } -} - -impl Metric for Metrics { - fn name() -> &'static str { - "iroh" - } -} - -/// Initialize the global metrics collection. -/// -/// Will return an error if the global metrics collection was already initialized. -pub fn try_init_metrics_collection() -> std::io::Result<()> { - iroh_metrics::core::Core::try_init(|reg, metrics| { - metrics.insert(crate::metrics::Metrics::new(reg)); - metrics.insert(iroh_net::metrics::MagicsockMetrics::new(reg)); - metrics.insert(iroh_net::metrics::NetReportMetrics::new(reg)); - metrics.insert(iroh_net::metrics::PortmapMetrics::new(reg)); - }) -} - -/// Collect the current metrics into a hash map. -/// -/// TODO: Only counters are supported for now, other metrics will be skipped without error. -pub fn get_metrics() -> anyhow::Result> { - let mut map = BTreeMap::new(); - let core = - iroh_metrics::core::Core::get().ok_or_else(|| anyhow::anyhow!("metrics are disabled"))?; - collect( - core.get_collector::(), - &mut map, - ); - collect( - core.get_collector::(), - &mut map, - ); - collect( - core.get_collector::(), - &mut map, - ); - Ok(map) -} - -// TODO: support other things than counters -fn collect(metrics: Option<&impl Iterable>, map: &mut BTreeMap) { - let Some(metrics) = metrics else { - return; - }; - for (name, counter) in metrics.iter() { - if let Some(counter) = counter.downcast_ref::() { - let value = counter.get(); - let description = counter.description.to_string(); - map.insert(name.to_string(), CounterStats { value, description }); - } - } -} diff --git a/iroh/src/node.rs b/iroh/src/node.rs deleted file mode 100644 index 2911617aef..0000000000 --- a/iroh/src/node.rs +++ /dev/null @@ -1,402 +0,0 @@ -//! Node API -//! -//! An iroh node is a server that is identified by an Ed25519 keypair and is -//! globally reachable via the [node id](crate::net::NodeId), which is the -//! public key of the keypair. -//! -//! By default, an iroh node speaks a number of built-in protocols. You can -//! *extend* the node with custom protocols or *disable* built-in protocols. -//! -//! # Building a node -//! -//! Nodes get created using the [`Builder`] which provides a very powerful API -//! to configure every aspect of the node. -//! -//! When using the default set of protocols, use [spawn](Builder::spawn) -//! to spawn a node directly from the builder. -//! -//! When adding custom protocols, use [build](Builder::build) to get a -//! [`ProtocolBuilder`] that allows to add custom protocols, then call -//! [spawn](ProtocolBuilder::spawn) to spawn the fully configured node. -//! -//! To implement a custom protocol, implement the [`ProtocolHandler`] trait -//! and use [`ProtocolBuilder::accept`] to add it to the node. -//! -//! # Using a node -//! -//! Once created, a node offers a small number of methods to interact with it, -//! most notably the iroh-net [endpoint](Node::endpoint) it is bound to. -//! -//! The main way to interact with a node is through the -//! [`client`](crate::client::Iroh). -//! -//! (The Node implements [Deref](std::ops::Deref) for client, which means that -//! methods defined on [Client](crate::client::Iroh) can be called on Node as -//! well, without going through [`client`](crate::client::Iroh)) -//! -//! To shut down the node, call [`Node::shutdown`]. -use std::{ - collections::BTreeSet, - fmt::Debug, - net::SocketAddr, - path::{Path, PathBuf}, - sync::Arc, - time::Duration, -}; - -use anyhow::{anyhow, Result}; -use futures_lite::StreamExt; -use futures_util::future::{MapErr, Shared}; -use iroh_base::key::PublicKey; -use iroh_net::{ - endpoint::{DirectAddrsStream, RemoteInfo}, - AddrInfo, Endpoint, NodeAddr, -}; -use iroh_router::{ProtocolHandler, Router}; -use quic_rpc::{transport::Listener as _, RpcServer}; -use tokio::task::{JoinError, JoinSet}; -use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; -use tracing::{debug, error, info, info_span, trace, warn, Instrument}; - -use crate::node::nodes_storage::store_node_addrs; - -mod builder; -mod nodes_storage; -mod rpc; -mod rpc_status; - -pub use self::{ - builder::{Builder, DiscoveryConfig, ProtocolBuilder, StorageConfig, DEFAULT_RPC_ADDR}, - rpc_status::RpcStatus, -}; - -/// How often to save node data. -const SAVE_NODES_INTERVAL: Duration = Duration::from_secs(30); - -/// The quic-rpc server endpoint for the iroh node. -/// -/// We use a boxed endpoint here to allow having a concrete type for the server endpoint. -pub type IrohServerEndpoint = quic_rpc::transport::boxed::BoxedListener< - crate::rpc_protocol::Request, - crate::rpc_protocol::Response, ->; - -/// A server which implements the iroh node. -/// -/// Clients can connect to this server and requests hashes from it. -/// -/// The only way to create this is by using the [`Builder::spawn`]. You can use [`Node::memory`] -/// or [`Node::persistent`] to create a suitable [`Builder`]. -/// -/// This runs a tokio task which can be aborted and joined if desired. To join the task -/// await the [`Node`] struct directly, it will complete when the task completes. If -/// this is dropped the node task is not stopped but keeps running. -#[derive(Debug, Clone)] -pub struct Node { - inner: Arc, - // `Node` needs to be `Clone + Send`, and we need to `task.await` in its `shutdown()` impl. - // So we need - // - `Shared` so we can `task.await` from all `Node` clones - // - `MapErr` to map the `JoinError` to a `String`, because `JoinError` is `!Clone` - // - `AbortOnDropHandle` to make sure that the `task` is cancelled when all `Node`s are dropped - // (`Shared` acts like an `Arc` around its inner future). - task: Shared, JoinErrToStr>>, - router: Router, -} - -pub(crate) type JoinErrToStr = Box String + Send + Sync + 'static>; - -#[derive(derive_more::Debug)] -struct NodeInner { - rpc_addr: Option, - endpoint: Endpoint, - cancel_token: CancellationToken, - client: crate::client::Iroh, -} - -impl Node { - /// Returns a new builder for the [`Node`], by default configured to run in memory. - /// - /// Once done with the builder call [`Builder::spawn`] to create the node. - pub fn memory() -> Builder { - Builder::memory() - } - - /// Returns a new builder for the [`Node`], configured to persist all data - /// from the given path. - /// - /// Once done with the builder call [`Builder::spawn`] to create the node. - pub async fn persistent(root: impl AsRef) -> Result { - Builder::memory().persist(root).await - } - - /// Returns the [`Endpoint`] of the node. - /// - /// This can be used to establish connections to other nodes under any - /// ALPNs other than the iroh internal ones. This is useful for some advanced - /// use cases. - pub fn endpoint(&self) -> &Endpoint { - &self.inner.endpoint - } - - /// The address on which the node socket is bound. - /// - /// Note that this could be an unspecified address, if you need an address on which you - /// can contact the node consider using [`Node::local_endpoint_addresses`]. However the - /// port will always be the concrete port. - pub fn local_address(&self) -> Vec { - let (v4, v6) = self.inner.endpoint.bound_sockets(); - let mut addrs = vec![v4]; - if let Some(v6) = v6 { - addrs.push(v6); - } - addrs - } - - /// Lists the local endpoint of this node. - pub fn local_endpoints(&self) -> DirectAddrsStream { - self.inner.endpoint.direct_addresses() - } - - /// Convenience method to get just the addr part of [`Node::local_endpoints`]. - pub async fn local_endpoint_addresses(&self) -> Result> { - self.inner.local_endpoint_addresses().await - } - - /// Returns the [`PublicKey`] of the node. - pub fn node_id(&self) -> PublicKey { - self.inner.endpoint.secret_key().public() - } - - /// Return a client to control this node over an in-memory channel. - pub fn client(&self) -> &crate::client::Iroh { - &self.inner.client - } - - /// Get the relay server we are connected to. - pub fn home_relay(&self) -> Option { - self.inner.endpoint.home_relay() - } - - /// Returns `Some(addr)` if an RPC endpoint is running, `None` otherwise. - pub fn my_rpc_addr(&self) -> Option { - self.inner.rpc_addr - } - - /// Shutdown the node. - /// - /// This does not gracefully terminate currently: all connections are closed and - /// anything in-transit is lost. The shutdown behaviour will become more graceful - /// in the future. - /// - /// Returns a future that completes once all tasks terminated and all resources are closed. - /// The future resolves to an error if the main task panicked. - pub async fn shutdown(self) -> Result<()> { - // Trigger shutdown of the main run task by activating the cancel token. - self.inner.cancel_token.cancel(); - - // Wait for the main task to terminate. - self.task.await.map_err(|err| anyhow!(err))?; - - Ok(()) - } - - /// Returns a token that can be used to cancel the node. - pub fn cancel_token(&self) -> CancellationToken { - self.inner.cancel_token.clone() - } - - /// Returns a protocol handler for an ALPN. - /// - /// This downcasts to the concrete type and returns `None` if the handler registered for `alpn` - /// does not match the passed type. - pub fn get_protocol(&self, alpn: &[u8]) -> Option> { - self.router.get_protocol(alpn) - } -} - -impl std::ops::Deref for Node { - type Target = crate::client::Iroh; - - fn deref(&self) -> &Self::Target { - &self.inner.client - } -} - -impl NodeInner { - async fn local_endpoint_addresses(&self) -> Result> { - let endpoints = self - .endpoint - .direct_addresses() - .next() - .await - .ok_or(anyhow!("no endpoints found"))?; - Ok(endpoints.into_iter().map(|x| x.addr).collect()) - } - - #[allow(clippy::too_many_arguments)] - async fn run( - self: Arc, - external_rpc: IrohServerEndpoint, - internal_rpc: IrohServerEndpoint, - router: Router, - nodes_data_path: Option, - ) { - let (ipv4, ipv6) = self.endpoint.bound_sockets(); - debug!( - "listening at: {}{}", - ipv4, - ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default() - ); - debug!("rpc listening at: {:?}", external_rpc.local_addr()); - - let mut join_set = JoinSet::new(); - - // Setup the RPC servers. - let external_rpc = RpcServer::new(external_rpc); - let internal_rpc = RpcServer::new(internal_rpc); - - if let Some(nodes_data_path) = nodes_data_path { - let ep = self.endpoint.clone(); - let token = self.cancel_token.clone(); - - join_set.spawn( - async move { - let mut save_timer = tokio::time::interval_at( - tokio::time::Instant::now() + SAVE_NODES_INTERVAL, - SAVE_NODES_INTERVAL, - ); - - loop { - tokio::select! { - biased; - _ = token.cancelled() => { - trace!("save known node addresses shutdown"); - let addrs = node_addresses_for_storage(&ep); - if let Err(err) = store_node_addrs(&nodes_data_path, &addrs).await { - warn!("failed to store known node addresses: {:?}", err); - } - break; - } - _ = save_timer.tick() => { - trace!("save known node addresses tick"); - let addrs = node_addresses_for_storage(&ep); - if let Err(err) = store_node_addrs(&nodes_data_path, &addrs).await { - warn!("failed to store known node addresses: {:?}", err); - } - } - } - } - - Ok(()) - } - .instrument(info_span!("known-addrs")), - ); - } - - loop { - tokio::select! { - biased; - _ = self.cancel_token.cancelled() => { - break; - }, - // handle rpc requests. This will do nothing if rpc is not configured, since - // accept is just a pending future. - request = external_rpc.accept() => { - match request { - Ok(accepting) => { - rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting, router.clone()); - } - Err(e) => { - info!("rpc request error: {:?}", e); - } - } - }, - // handle internal rpc requests. - request = internal_rpc.accept() => { - match request { - Ok(accepting) => { - rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting, router.clone()); - } - Err(e) => { - info!("internal rpc request error: {:?}", e); - } - } - }, - // handle task terminations and quit on panics. - res = join_set.join_next(), if !join_set.is_empty() => { - match res { - Some(Err(outer)) => { - if outer.is_panic() { - error!("Task panicked: {outer:?}"); - break; - } else if outer.is_cancelled() { - debug!("Task cancelled: {outer:?}"); - } else { - error!("Task failed: {outer:?}"); - break; - } - } - Some(Ok(Err(inner))) => { - debug!("Task errored: {inner:?}"); - } - _ => {} - } - }, - } - } - - if let Err(err) = router.shutdown().await { - tracing::warn!("Error when shutting down router: {:?}", err); - }; - - // Abort remaining tasks. - join_set.shutdown().await; - tracing::info!("Shutting down remaining tasks"); - - // Abort remaining local tasks. - tracing::info!("Shutting down local pool"); - } -} - -fn node_addresses_for_storage(ep: &Endpoint) -> Vec { - ep.remote_info_iter() - .filter_map(node_address_for_storage) - .collect() -} -/// Get the addressing information of this endpoint that should be stored. -/// -/// If the endpoint was not used at all in this session, all known addresses will be returned. -/// If the endpoint was used, only the paths that were in use will be returned. -/// -/// Returns `None` if the resulting [`NodeAddr`] would be empty. -fn node_address_for_storage(info: RemoteInfo) -> Option { - let direct_addresses = if info.last_used.is_none() { - info.addrs - .into_iter() - .map(|info| info.addr) - .collect::>() - } else { - info.addrs - .iter() - .filter_map(|info| { - if info.last_alive.is_some() { - Some(info.addr) - } else { - None - } - }) - .collect::>() - }; - if direct_addresses.is_empty() && info.relay_url.is_none() { - None - } else { - Some(NodeAddr { - node_id: info.node_id, - info: AddrInfo { - relay_url: info.relay_url.map(|u| u.into()), - direct_addresses, - }, - }) - } -} diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs deleted file mode 100644 index a5b29cc795..0000000000 --- a/iroh/src/node/builder.rs +++ /dev/null @@ -1,715 +0,0 @@ -use std::{ - net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, - path::{Path, PathBuf}, - sync::Arc, - time::Duration, -}; - -use anyhow::{Context, Result}; -use futures_lite::StreamExt; -use futures_util::{FutureExt as _, TryFutureExt as _}; -use iroh_base::key::SecretKey; -#[cfg(not(test))] -use iroh_net::discovery::local_swarm_discovery::LocalSwarmDiscovery; -use iroh_net::{ - discovery::{dns::DnsDiscovery, pkarr::PkarrPublisher, ConcurrentDiscovery, Discovery}, - dns::DnsResolver, - endpoint::{force_staging_infra, TransportConfig}, - Endpoint, RelayMode, -}; -use iroh_router::{ProtocolHandler, RouterBuilder}; -use quic_rpc::transport::{boxed::BoxableListener, quinn::QuinnListener}; -use tokio::task::JoinError; -use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; -use tracing::{error_span, trace, Instrument}; - -use super::{rpc_status::RpcStatus, IrohServerEndpoint, JoinErrToStr, Node, NodeInner}; -use crate::{ - client::RPC_ALPN, - node::nodes_storage::load_node_addrs, - util::{fs::load_secret_key, path::IrohPaths}, -}; - -/// Default bind address for the node. -/// 11204 is "iroh" in leetspeak -pub const DEFAULT_BIND_PORT: u16 = 11204; - -/// How long we wait at most for some endpoints to be discovered. -const ENDPOINT_WAIT: Duration = Duration::from_secs(5); - -/// The default bind address for the iroh IPv4 socket. -pub const DEFAULT_BIND_ADDR_V4: SocketAddrV4 = - SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_BIND_PORT); - -/// The default bind address for the iroh IPv6 socket. -pub const DEFAULT_BIND_ADDR_V6: SocketAddrV6 = - SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, DEFAULT_BIND_PORT + 1, 0, 0); - -/// Builder for the [`Node`]. -/// -/// The default **relay servers** are hosted by [number 0] on the `iroh.network` domain. To -/// customise this use the [`Builder::relay_mode`] function. -/// -/// For **node discovery** the default is to use the [number 0] hosted DNS server hosted on -/// `iroh.link`. To customise this use the [`Builder::node_discovery`] function. -/// -/// Note that some defaults change when running using `cfg(test)`, see the individual -/// methods for details. -/// -/// Finally you can create and run the node by calling [`Builder::spawn`]. -/// -/// The returned [`Node`] is awaitable to know when it finishes. It can be terminated -/// using [`Node::shutdown`]. -/// -/// [number 0]: https://n0.computer -#[derive(derive_more::Debug)] -pub struct Builder { - storage: StorageConfig, - addr_v4: SocketAddrV4, - addr_v6: SocketAddrV6, - secret_key: SecretKey, - rpc_endpoint: IrohServerEndpoint, - rpc_addr: Option, - keylog: bool, - relay_mode: RelayMode, - dns_resolver: Option, - node_discovery: DiscoveryConfig, - #[cfg(any(test, feature = "test-utils"))] - insecure_skip_relay_cert_verify: bool, - transport_config: Option, -} - -/// Configuration for storage. -#[derive(Debug)] -pub enum StorageConfig { - /// In memory - Mem, - /// On disk persistet, at this location. - Persistent(PathBuf), -} - -/// Configuration for node discovery. -/// -/// Node discovery enables connecting to other peers by only the [`NodeId`]. This usually -/// works by the nodes publishing their [`RelayUrl`] and/or their direct addresses to some -/// publicly available service. -/// -/// [`NodeId`]: crate::base::key::NodeId -/// [`RelayUrl`]: crate::base::node_addr::RelayUrl -#[derive(Debug, Default)] -pub enum DiscoveryConfig { - /// Use no node discovery mechanism. - None, - /// Use the default discovery mechanism. - /// - /// This uses two discovery services concurrently: - /// - /// - It publishes to a pkarr service operated by [number 0] which makes the information - /// available via DNS in the `iroh.link` domain. - /// - /// - It uses an mDNS-like system to announce itself on the local network. - /// - /// # Usage during tests - /// - /// Note that the default changes when compiling with `cfg(test)` or the `test-utils` - /// cargo feature from [iroh-net] is enabled. In this case only the Pkarr/DNS service - /// is used, but on the `iroh.test` domain. This domain is not integrated with the - /// global DNS network and thus node discovery is effectively disabled. To use node - /// discovery in a test use the `iroh_net::test_utils::DnsPkarrServer` in the test and - /// configure it here as a custom discovery mechanism ([`DiscoveryConfig::Custom`]). - /// - /// [number 0]: https://n0.computer - /// [iroh-net]: crate::net - #[default] - Default, - /// Use a custom discovery mechanism. - Custom(Box), -} - -impl From> for DiscoveryConfig { - fn from(value: Box) -> Self { - Self::Custom(value) - } -} - -/// A server endpoint that does nothing. Accept will never resolve. -/// -/// This is used unless an external rpc endpoint is configured. -#[derive(Debug, Default)] -struct DummyServerEndpoint; - -impl BoxableListener - for DummyServerEndpoint -{ - fn clone_box( - &self, - ) -> Box> { - Box::new(DummyServerEndpoint) - } - - fn accept_bi_boxed( - &self, - ) -> quic_rpc::transport::boxed::AcceptFuture< - crate::rpc_protocol::Request, - crate::rpc_protocol::Response, - > { - quic_rpc::transport::boxed::AcceptFuture::boxed(futures_lite::future::pending()) - } - - fn local_addr(&self) -> &[quic_rpc::transport::LocalAddr] { - &[] - } -} - -fn mk_external_rpc() -> IrohServerEndpoint { - quic_rpc::transport::boxed::BoxedListener::new(DummyServerEndpoint) -} - -impl Builder { - /// Creates a default node builder with in memory configuration. - pub fn memory() -> Self { - // Use staging in testing - let relay_mode = match force_staging_infra() { - true => RelayMode::Staging, - false => RelayMode::Default, - }; - - Self { - storage: StorageConfig::Mem, - addr_v4: DEFAULT_BIND_ADDR_V4, - addr_v6: DEFAULT_BIND_ADDR_V6, - secret_key: SecretKey::generate(), - keylog: false, - relay_mode, - dns_resolver: None, - rpc_endpoint: mk_external_rpc(), - rpc_addr: None, - node_discovery: Default::default(), - #[cfg(any(test, feature = "test-utils"))] - insecure_skip_relay_cert_verify: false, - transport_config: None, - } - } - - /// Creates a new builder for [`Node`] using the given databases. - pub fn with_db_and_store(storage: StorageConfig) -> Self { - // Use staging in testing - let relay_mode = match force_staging_infra() { - true => RelayMode::Staging, - false => RelayMode::Default, - }; - - Self { - storage, - addr_v4: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_BIND_PORT), - addr_v6: SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, DEFAULT_BIND_PORT + 1, 0, 0), - secret_key: SecretKey::generate(), - keylog: false, - relay_mode, - dns_resolver: None, - rpc_endpoint: mk_external_rpc(), - rpc_addr: None, - node_discovery: Default::default(), - #[cfg(any(test, feature = "test-utils"))] - insecure_skip_relay_cert_verify: false, - transport_config: None, - } - } -} - -impl Builder { - /// Persist all node data in the provided directory. - pub async fn persist(self, root: impl AsRef) -> Result { - let root = root.as_ref(); - let secret_key_path = IrohPaths::SecretKey.with_root(root); - let secret_key = load_secret_key(secret_key_path).await?; - - Ok(Builder { - storage: StorageConfig::Persistent(root.into()), - addr_v4: self.addr_v4, - addr_v6: self.addr_v6, - secret_key, - keylog: self.keylog, - rpc_endpoint: self.rpc_endpoint, - rpc_addr: self.rpc_addr, - relay_mode: self.relay_mode, - dns_resolver: self.dns_resolver, - node_discovery: self.node_discovery, - #[cfg(any(test, feature = "test-utils"))] - insecure_skip_relay_cert_verify: false, - transport_config: self.transport_config, - }) - } - - /// Configure rpc endpoint. - pub fn rpc_endpoint(self, value: IrohServerEndpoint, rpc_addr: Option) -> Self { - Self { - rpc_endpoint: value, - rpc_addr, - ..self - } - } - - /// Configure the default iroh rpc endpoint, on the default address. - pub async fn enable_rpc(self) -> Result { - self.enable_rpc_with_addr(DEFAULT_RPC_ADDR).await - } - - /// Configure the default iroh rpc endpoint. - pub async fn enable_rpc_with_addr(self, mut rpc_addr: SocketAddr) -> Result { - let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, rpc_addr)?; - rpc_addr.set_port(actual_rpc_port); - - let ep = quic_rpc::transport::boxed::BoxedListener::new(ep); - if let StorageConfig::Persistent(ref root) = self.storage { - // store rpc endpoint - RpcStatus::store(root, actual_rpc_port).await?; - } - - Ok(Self { - rpc_endpoint: ep, - rpc_addr: Some(rpc_addr), - ..self - }) - } - - /// Sets the relay servers to assist in establishing connectivity. - /// - /// Relay servers are used to discover other nodes by `PublicKey` and also help - /// establish connections between peers by being an initial relay for traffic while - /// assisting in holepunching to establish a direct connection between peers. - /// - /// When using [`RelayMode::Custom`], the provided `relay_map` must contain at least one - /// configured relay node. If an invalid [`iroh_net::RelayMode`] is provided - /// [`Self::spawn`] will result in an error. - /// - /// # Usage during tests - /// - /// Note that while the default is [`RelayMode::Default`], when using `cfg(test)` or - /// when the `test-utils` cargo feature [`RelayMode::Staging`] is the default. - pub fn relay_mode(mut self, relay_mode: RelayMode) -> Self { - self.relay_mode = relay_mode; - self - } - - /// Sets the node discovery mechanism. - /// - /// Node discovery enables connecting to other peers by only the [`NodeId`]. This - /// usually works by the nodes publishing their [`RelayUrl`] and/or their direct - /// addresses to some publicly available service. - /// - /// See [`DiscoveryConfig::default`] for the defaults, note that the defaults change - /// when using `cfg(test)`. - /// - /// [`NodeId`]: crate::base::key::NodeId - /// [`RelayUrl`]: crate::base::node_addr::RelayUrl - pub fn node_discovery(mut self, config: DiscoveryConfig) -> Self { - self.node_discovery = config; - self - } - - /// Optionally set a custom DNS resolver to use for the magic endpoint. - /// - /// The DNS resolver is used to resolve relay hostnames, and node addresses if - /// [`DnsDiscovery`] is configured (which is the default). - /// - /// By default, all magic endpoints share a DNS resolver, which is configured to use the - /// host system's DNS configuration. You can pass a custom instance of [`DnsResolver`] - /// here to use a differently configured DNS resolver for this endpoint. - pub fn dns_resolver(mut self, dns_resolver: DnsResolver) -> Self { - self.dns_resolver = Some(dns_resolver); - self - } - - /// Binds the node service to a specific socket IPv4 address. - /// - /// By default this will be set to `0.0.0.0:11204` - /// - /// Setting the port to `0` will assign a random port. - /// If the port used is not free, a random different port will be used. - pub fn bind_addr_v4(mut self, addr: SocketAddrV4) -> Self { - self.addr_v4 = addr; - self - } - - /// Binds the node service to a specific socket IPv6 address. - /// - /// By default this will be set to `[::]:11205` - /// - /// Setting the port to `0` will assign a random port. - /// If the port used is not free, a random different port will be used. - pub fn bind_addr_v6(mut self, addr: SocketAddrV6) -> Self { - self.addr_v6 = addr; - self - } - - /// Use a random port for both IPv4 and IPv6. - /// - /// This is a convenience function useful when you do not need a specific port - /// and want to avoid conflicts when running multiple instances, e.g. in tests. - /// - /// This overrides the ports of the socket addresses provided by [`Builder::bind_addr_v4`] - /// and [`Builder::bind_addr_v6`]. By default both of those bind to the - /// unspecified address, which would result in `0.0.0.0:11204` and `[::]:11205` as bind - /// addresses unless they are changed. - pub fn bind_random_port(mut self) -> Self { - self.addr_v4.set_port(0); - self.addr_v6.set_port(0); - self - } - - /// Uses the given [`SecretKey`] for the `PublicKey` instead of a newly generated one. - pub fn secret_key(mut self, secret_key: SecretKey) -> Self { - self.secret_key = secret_key; - self - } - - /// Sets a custom [`TransportConfig`] to be used by the [`Endpoint`]. - /// - /// If not set, the [`Endpoint`] will use its default [`TransportConfig`]. See - /// [`crate::net::endpoint::Builder::transport_config`] for details. - pub fn transport_config(mut self, config: TransportConfig) -> Self { - self.transport_config = Some(config); - self - } - - /// Skip verification of SSL certificates from relay servers - /// - /// May only be used in tests. - #[cfg(any(test, feature = "test-utils"))] - #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))] - pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self { - self.insecure_skip_relay_cert_verify = skip_verify; - self - } - - /// Whether to log the SSL pre-master key. - /// - /// If `true` and the `SSLKEYLOGFILE` environment variable is the path to a file this - /// file will be used to log the SSL pre-master key. This is useful to inspect captured - /// traffic. - pub fn keylog(mut self, keylog: bool) -> Self { - self.keylog = keylog; - self - } - - /// Spawns the [`Node`] in a tokio task. - /// - /// This will create the underlying network server and spawn a tokio task accepting - /// connections. The returned [`Node`] can be used to control the task as well as - /// get information about it. - pub async fn spawn(self) -> Result { - let unspawned_node = self.build().await?; - unspawned_node.spawn().await - } - - /// Builds a node without spawning it. - /// - /// Returns a [`ProtocolBuilder`], on which custom protocols can be registered with - /// [`ProtocolBuilder::accept`]. To spawn the node, call [`ProtocolBuilder::spawn`]. - pub async fn build(self) -> Result { - trace!("building node"); - let (endpoint, nodes_data_path) = { - let discovery: Option> = match self.node_discovery { - DiscoveryConfig::None => None, - DiscoveryConfig::Custom(discovery) => Some(discovery), - DiscoveryConfig::Default => { - #[cfg(not(test))] - let discovery = { - let mut discovery_services: Vec> = vec![ - // Enable DNS discovery by default - Box::new(DnsDiscovery::n0_dns()), - // Enable pkarr publishing by default - Box::new(PkarrPublisher::n0_dns(self.secret_key.clone())), - ]; - // Enable local swarm discovery by default, but fail silently if it errors - match LocalSwarmDiscovery::new(self.secret_key.public()) { - Err(e) => { - tracing::error!("unable to start LocalSwarmDiscoveryService: {e:?}") - } - Ok(service) => { - discovery_services.push(Box::new(service)); - } - } - ConcurrentDiscovery::from_services(discovery_services) - }; - #[cfg(test)] - let discovery = ConcurrentDiscovery::from_services(vec![ - // Enable DNS discovery by default - Box::new(DnsDiscovery::n0_dns()), - // Enable pkarr publishing by default - Box::new(PkarrPublisher::n0_dns(self.secret_key.clone())), - ]); - Some(Box::new(discovery)) - } - }; - - let endpoint = Endpoint::builder() - .secret_key(self.secret_key.clone()) - .proxy_from_env() - .keylog(self.keylog) - .relay_mode(self.relay_mode); - - let endpoint = match self.transport_config { - Some(config) => endpoint.transport_config(config), - None => endpoint, - }; - let endpoint = match discovery { - Some(discovery) => endpoint.discovery(discovery), - None => endpoint, - }; - let mut endpoint = match self.dns_resolver { - Some(resolver) => endpoint.dns_resolver(resolver), - None => endpoint, - }; - - #[cfg(any(test, feature = "test-utils"))] - { - endpoint = - endpoint.insecure_skip_relay_cert_verify(self.insecure_skip_relay_cert_verify); - } - - let nodes_data_path = match self.storage { - StorageConfig::Persistent(ref root) => { - let nodes_data_path = IrohPaths::PeerData.with_root(root); - let node_addrs = load_node_addrs(&nodes_data_path) - .await - .context("loading known node addresses")?; - endpoint = endpoint.known_nodes(node_addrs); - Some(nodes_data_path) - } - StorageConfig::Mem => None, - }; - - ( - endpoint - .bind_addr_v4(self.addr_v4) - .bind_addr_v6(self.addr_v6) - .bind() - .await?, - nodes_data_path, - ) - }; - trace!("created endpoint"); - - let addr = endpoint.node_addr().await?; - trace!("endpoint address: {addr:?}"); - - // Initialize the internal RPC connection. - let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32); - let internal_rpc = quic_rpc::transport::boxed::BoxedListener::new(internal_rpc); - // box the controller. Boxing has a special case for the flume channel that avoids allocations, - // so this has zero overhead. - let controller = quic_rpc::transport::boxed::BoxedConnector::new(controller); - let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); - - let inner = Arc::new(NodeInner { - rpc_addr: self.rpc_addr, - endpoint: endpoint.clone(), - client, - cancel_token: CancellationToken::new(), - }); - - let protocol_builder = ProtocolBuilder { - inner, - router: RouterBuilder::new(endpoint), - internal_rpc, - external_rpc: self.rpc_endpoint, - nodes_data_path, - }; - - Ok(protocol_builder) - } -} - -/// A node that is initialized but not yet spawned. -/// -/// This is returned from [`Builder::build`] and may be used to register custom protocols with -/// [`Self::accept`]. It provides access to the services which are already started, the node's -/// endpoint and a client to the node. -/// -/// Note that RPC calls performed with client returned from [`Self::client`] will not complete -/// until the node is spawned. -#[derive(derive_more::Debug)] -pub struct ProtocolBuilder { - inner: Arc, - internal_rpc: IrohServerEndpoint, - external_rpc: IrohServerEndpoint, - router: RouterBuilder, - nodes_data_path: Option, -} - -impl ProtocolBuilder { - /// Registers a protocol handler for incoming connections. - /// - /// Use this to register custom protocols onto the iroh node. Whenever a new connection for - /// `alpn` comes in, it is passed to this protocol handler. - /// - /// See the [`ProtocolHandler`] trait for details. - /// - /// Example usage: - /// - /// ```rust - /// # use std::sync::Arc; - /// # use anyhow::Result; - /// # use futures_lite::future::Boxed as BoxedFuture; - /// # use iroh::{node::{Node}, net::endpoint::Connecting, client::Iroh, router::ProtocolHandler}; - /// # - /// # #[tokio::main] - /// # async fn main() -> Result<()> { - /// - /// const MY_ALPN: &[u8] = b"my-protocol/1"; - /// - /// #[derive(Debug)] - /// struct MyProtocol { - /// client: Iroh - /// } - /// - /// impl ProtocolHandler for MyProtocol { - /// fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { - /// todo!(); - /// } - /// } - /// - /// let unspawned_node = Node::memory() - /// .build() - /// .await?; - /// - /// let client = unspawned_node.client().clone(); - /// let handler = MyProtocol { client }; - /// - /// let node = unspawned_node - /// .accept(MY_ALPN, Arc::new(handler)) - /// .spawn() - /// .await?; - /// # node.shutdown().await?; - /// # Ok(()) - /// # } - /// ``` - pub fn accept(mut self, alpn: impl AsRef<[u8]>, handler: Arc) -> Self { - self.router = self.router.accept(alpn, handler); - self - } - - /// Returns a client to control this node over an in-memory channel. - /// - /// Note that RPC calls performed with the client will not complete until the node is - /// spawned. - pub fn client(&self) -> &crate::client::Iroh { - &self.inner.client - } - - /// Returns the [`Endpoint`] of the node. - pub fn endpoint(&self) -> &Endpoint { - &self.inner.endpoint - } - - /// Returns a protocol handler for an ALPN. - /// - /// This downcasts to the concrete type and returns `None` if the handler registered for `alpn` - /// does not match the passed type. - pub fn get_protocol(&self, alpn: &[u8]) -> Option> { - self.router.get_protocol::

(alpn) - } - - /// Spawns the node and starts accepting connections. - pub async fn spawn(self) -> Result { - let Self { - inner, - internal_rpc, - external_rpc, - router, - nodes_data_path, - } = self; - let node_id = inner.endpoint.node_id(); - - let router = router.spawn().await?; - - // Spawn the main task and store it in the node for structured termination in shutdown. - let fut = inner - .clone() - .run(external_rpc, internal_rpc, router.clone(), nodes_data_path) - .instrument(error_span!("node", me=%node_id.fmt_short())); - let task = tokio::task::spawn(fut); - - let node = Node { - inner, - router, - task: AbortOnDropHandle::new(task) - .map_err(Box::new(|e: JoinError| e.to_string()) as JoinErrToStr) - .shared(), - }; - - // Wait for a single direct address update, to make sure - // we found at least one direct address. - let wait_for_endpoints = { - let node = node.clone(); - async move { - tokio::time::timeout(ENDPOINT_WAIT, node.endpoint().direct_addresses().next()) - .await - .context("waiting for endpoint")? - .context("no endpoints")?; - Ok(()) - } - }; - - if let Err(err) = wait_for_endpoints.await { - node.shutdown().await.ok(); - return Err(err); - } - - Ok(node) - } -} - -const DEFAULT_RPC_PORT: u16 = 0x1337; -const MAX_RPC_STREAMS: u32 = 1024; - -/// The default bind addr of the RPC . -pub const DEFAULT_RPC_ADDR: SocketAddr = - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_RPC_PORT)); - -/// Makes a an RPC endpoint that uses a QUIC transport. -/// -/// Note that this uses the Quinn version used by quic-rpc. -fn make_rpc_endpoint( - secret_key: &SecretKey, - mut rpc_addr: SocketAddr, -) -> Result<( - QuinnListener, - u16, -)> { - let mut transport_config = quinn::TransportConfig::default(); - transport_config - .max_concurrent_bidi_streams(MAX_RPC_STREAMS.into()) - .max_concurrent_uni_streams(0u32.into()); - let server_config = iroh_net::endpoint::make_server_config( - secret_key, - vec![RPC_ALPN.to_vec()], - Arc::new(transport_config), - false, - )?; - - let rpc_quinn_endpoint = quinn::Endpoint::server(server_config.clone(), rpc_addr); - let rpc_quinn_endpoint = match rpc_quinn_endpoint { - Ok(ep) => ep, - Err(err) => { - if err.kind() == std::io::ErrorKind::AddrInUse { - tracing::warn!( - "RPC port: {} already in use, switching to random port", - rpc_addr, - ); - // Use a random port - rpc_addr.set_port(0); - quinn::Endpoint::server(server_config, rpc_addr)? - } else { - return Err(err.into()); - } - } - }; - - let actual_rpc_port = rpc_quinn_endpoint.local_addr()?.port(); - let rpc_endpoint = QuinnListener::new(rpc_quinn_endpoint)?; - - Ok((rpc_endpoint, actual_rpc_port)) -} diff --git a/iroh/src/node/nodes_storage.rs b/iroh/src/node/nodes_storage.rs deleted file mode 100644 index af35617caf..0000000000 --- a/iroh/src/node/nodes_storage.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::path::Path; - -use anyhow::{ensure, Context, Result}; -use iroh_net::NodeAddr; -use tokio::io::AsyncWriteExt; - -pub(super) async fn load_node_addrs>(path: P) -> Result> { - let path = path.as_ref(); - let mut out = Vec::new(); - - if tokio::fs::try_exists(&path).await.unwrap_or(false) { - ensure!(path.is_file(), "{} is not a file", path.display()); - let contents = tokio::fs::read(path).await?; - let mut slice: &[u8] = &contents; - while !slice.is_empty() { - let (node_addr, next_contents) = - postcard::take_from_bytes(slice).context("failed to load node data")?; - out.push(node_addr); - slice = next_contents; - } - } - Ok(out) -} - -pub(super) async fn store_node_addrs>( - path: P, - known_nodes: &[NodeAddr], -) -> Result { - let path = path.as_ref(); - - if tokio::fs::try_exists(&path).await.unwrap_or(false) { - ensure!(path.is_file(), "{} must be a file", path.display()); - } - - // persist only the nodes which were - // * not used at all (so we don't forget everything we loaded) - // * were attempted to be used, and have at least one usable path - if known_nodes.is_empty() { - // prevent file handling if unnecessary - return Ok(0); - } - - let mut ext = path.extension().map(|s| s.to_owned()).unwrap_or_default(); - ext.push(".tmp"); - let tmp_path = path.with_extension(ext); - - if tokio::fs::try_exists(&tmp_path).await.unwrap_or(false) { - tokio::fs::remove_file(&tmp_path) - .await - .context("failed deleting existing tmp file")?; - } - if let Some(parent) = tmp_path.parent() { - tokio::fs::create_dir_all(parent).await?; - } - let mut tmp = tokio::fs::File::create(&tmp_path) - .await - .context("failed creating tmp file")?; - - let mut count = 0; - for node_addr in known_nodes { - let ser = postcard::to_stdvec(&node_addr).context("failed to serialize node data")?; - tmp.write_all(&ser) - .await - .context("failed to persist node data")?; - count += 1; - } - tmp.flush().await.context("failed to flush node data")?; - drop(tmp); - - // move the file - tokio::fs::rename(tmp_path, path) - .await - .context("failed renaming node data file")?; - Ok(count) -} - -#[cfg(test)] -mod tests { - use std::net::{Ipv4Addr, SocketAddr}; - - use iroh_net::{key::SecretKey, relay::RelayUrl}; - - use super::*; - - /// Test persisting and loading of known nodes. - #[tokio::test] - async fn load_save_node_data() { - let _guard = iroh_test::logging::setup(); - - let mut node_map = Vec::new(); - - let node_a = SecretKey::generate().public(); - let node_b = SecretKey::generate().public(); - let node_c = SecretKey::generate().public(); - let node_d = SecretKey::generate().public(); - - let relay_x: RelayUrl = "https://my-relay-1.com".parse().unwrap(); - let relay_y: RelayUrl = "https://my-relay-2.com".parse().unwrap(); - - let direct_addresses_a = [addr(4000), addr(4001)]; - let direct_addresses_c = [addr(5000)]; - - let node_addr_a = NodeAddr::new(node_a) - .with_relay_url(relay_x) - .with_direct_addresses(direct_addresses_a); - let node_addr_b = NodeAddr::new(node_b).with_relay_url(relay_y); - let node_addr_c = NodeAddr::new(node_c).with_direct_addresses(direct_addresses_c); - let node_addr_d = NodeAddr::new(node_d); - - node_map.push(node_addr_a); - node_map.push(node_addr_b); - node_map.push(node_addr_c); - node_map.push(node_addr_d); - - let root = testdir::testdir!(); - let path = root.join("nodes.postcard"); - store_node_addrs(&path, &node_map).await.unwrap(); - let loaded = load_node_addrs(&path).await.unwrap(); - assert_eq!(node_map, loaded); - } - - fn addr(port: u16) -> SocketAddr { - (std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), port).into() - } -} diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs deleted file mode 100644 index 959bc177be..0000000000 --- a/iroh/src/node/rpc.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::{collections::BTreeMap, fmt::Debug, sync::Arc}; - -use anyhow::Result; -use iroh_node_util::rpc::proto::node::CounterStats; -use iroh_router::Router; -use quic_rpc::server::{RpcChannel, RpcServerError}; -use tokio::task::JoinSet; -use tracing::{debug, warn}; - -use super::IrohServerEndpoint; -use crate::{ - node::NodeInner, - rpc_protocol::{Request, RpcService}, -}; - -#[derive(Debug, Clone)] -pub(crate) struct Handler { - pub(crate) inner: Arc, - pub(crate) _router: Router, -} - -impl Handler { - pub fn new(inner: Arc, router: Router) -> Self { - Self { - inner, - _router: router, - } - } -} - -impl iroh_node_util::rpc::server::AbstractNode for NodeInner { - fn endpoint(&self) -> &iroh_net::Endpoint { - &self.endpoint - } - - fn shutdown(&self) { - self.cancel_token.cancel(); - } - - fn rpc_addr(&self) -> Option { - self.rpc_addr - } - - fn stats(&self) -> anyhow::Result> { - #[cfg(feature = "metrics")] - return crate::metrics::get_metrics(); - - #[cfg(not(feature = "metrics"))] - anyhow::bail!("metrics are disabled") - } -} - -impl Handler { - pub(crate) fn spawn_rpc_request( - inner: Arc, - join_set: &mut JoinSet>, - accepting: quic_rpc::server::Accepting, - router: Router, - ) { - let handler = Self::new(inner, router); - join_set.spawn(async move { - let (msg, chan) = accepting.read_first().await?; - if let Err(err) = handler.handle_rpc_request(msg, chan).await { - warn!("rpc request handler error: {err:?}"); - } - Ok(()) - }); - } - - async fn handle_node_request( - self, - msg: iroh_node_util::rpc::proto::Request, - chan: RpcChannel, - ) -> Result<(), RpcServerError> { - debug!("handling node request: {msg:?}"); - iroh_node_util::rpc::server::handle_rpc_request(self.inner, msg, chan.map().boxed()) - .await - .map_err(|e| e.errors_into()) - } - - pub(crate) async fn handle_rpc_request( - self, - msg: Request, - chan: RpcChannel, - ) -> Result<(), RpcServerError> { - use Request::*; - debug!("handling rpc request: {msg}"); - match msg { - Node(msg) => self.handle_node_request(msg, chan).await, - } - } -} diff --git a/iroh/src/node/rpc_status.rs b/iroh/src/node/rpc_status.rs deleted file mode 100644 index 272c2b2f33..0000000000 --- a/iroh/src/node/rpc_status.rs +++ /dev/null @@ -1,112 +0,0 @@ -use std::{ - net::{Ipv4Addr, SocketAddr}, - path::Path, -}; - -use anyhow::{ensure, Context, Result}; -use tokio::{fs, io::AsyncReadExt}; -use tracing::trace; - -use crate::util::path::IrohPaths; - -/// The current status of the RPC endpoint. -#[derive(Debug, Clone)] -pub enum RpcStatus { - /// Stopped. - Stopped, - /// Running on this port. - Running { - /// The port we are connected on. - port: u16, - /// Actual connected RPC client. - client: crate::client::RpcClient, - }, -} - -impl RpcStatus { - /// Load the current RPC status from the given location. - pub async fn load(root: impl AsRef) -> Result { - let p = IrohPaths::RpcLock.with_root(root); - trace!("loading RPC lock: {}", p.display()); - - if p.exists() { - // Lock file exists, read the port and check if we can get a connection. - let mut file = fs::File::open(&p).await.context("open rpc lock file")?; - let file_len = file - .metadata() - .await - .context("reading rpc lock file metadata")? - .len(); - if file_len == 2 { - let mut buffer = [0u8; 2]; - file.read_exact(&mut buffer) - .await - .context("read rpc lock file")?; - let running_rpc_port = u16::from_le_bytes(buffer); - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), running_rpc_port); - if let Ok(client) = crate::client::quic_connect_raw(addr).await { - return Ok(RpcStatus::Running { - port: running_rpc_port, - client, - }); - } - } - - // invalid or outdated rpc lock file, delete - drop(file); - fs::remove_file(&p) - .await - .context("deleting rpc lock file")?; - Ok(RpcStatus::Stopped) - } else { - // No lock file, stopped - Ok(RpcStatus::Stopped) - } - } - - /// Store the current rpc status. - pub async fn store(root: impl AsRef, rpc_port: u16) -> Result<()> { - let p = IrohPaths::RpcLock.with_root(root); - trace!("storing RPC lock: {}", p.display()); - - ensure!(!p.exists(), "iroh is already running"); - if let Some(parent) = p.parent() { - fs::create_dir_all(parent) - .await - .context("creating parent dir")?; - } - fs::write(&p, &rpc_port.to_le_bytes()) - .await - .context("writing rpc lock file")?; - Ok(()) - } - - /// Cleans up an existing rpc lock - pub async fn clear(root: impl AsRef) -> Result<()> { - let p = IrohPaths::RpcLock.with_root(root); - trace!("clearing RPC lock: {}", p.display()); - - // ignore errors - tokio::fs::remove_file(&p).await.ok(); - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_rpc_lock_file() { - let dir = testdir::testdir!(); - - let rpc_port = 7778; - RpcStatus::store(&dir, rpc_port).await.unwrap(); - let status = RpcStatus::load(&dir).await.unwrap(); - assert!(matches!(status, RpcStatus::Stopped)); - let p = IrohPaths::RpcLock.with_root(&dir); - let exists = fs::try_exists(&p).await.unwrap(); - assert!(!exists, "should be deleted as not running"); - } -} diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs deleted file mode 100644 index b5958348ea..0000000000 --- a/iroh/src/rpc_protocol.rs +++ /dev/null @@ -1,43 +0,0 @@ -//! This defines the RPC protocol used for communication between a CLI and an iroh node. -//! -//! RPC is done using the [`quic-rpc`](https://docs.rs/quic-rpc) crate. -//! -//! The RPC protocol is split into subsystems. In each subsystem, there is an -//! enum for the requests and an enum for the responses. The top level request -//! and response enums have a variant for each subsystem. -//! -//! Request and response enums for each subsystem derive conversions to the -//! top level enums using the -//! [`enum_conversions``](https://docs.rs/nested_enum_utils/0.1.0/nested_enum_utils/attr.enum_conversions.html) -//! macro. -//! -//! For each rpc request, the quic-rpc interaction pattern is defined using -//! attributes provided by the -//! [`rpc_requests`](https://docs.rs/quic-rpc-derive/latest/quic_rpc_derive/attr.rpc_requests.html) -//! macro. -use serde::{Deserialize, Serialize}; - -/// The RPC service for the iroh provider process. -#[derive(Debug, Clone)] -pub struct RpcService; - -/// The request enum, listing all possible requests. -#[allow(missing_docs, clippy::large_enum_variant)] -#[derive(strum::Display, Debug, Serialize, Deserialize)] -#[nested_enum_utils::enum_conversions()] -pub enum Request { - Node(iroh_node_util::rpc::proto::Request), -} - -/// The response enum, listing all possible responses. -#[allow(missing_docs, clippy::large_enum_variant)] -#[derive(Debug, Serialize, Deserialize)] -#[nested_enum_utils::enum_conversions()] -pub enum Response { - Node(iroh_node_util::rpc::proto::Response), -} - -impl quic_rpc::Service for RpcService { - type Req = Request; - type Res = Response; -} diff --git a/iroh/src/util.rs b/iroh/src/util.rs deleted file mode 100644 index 341cf83dfe..0000000000 --- a/iroh/src/util.rs +++ /dev/null @@ -1,4 +0,0 @@ -//! Utilities for working with iroh. - -pub mod fs; -pub mod path; diff --git a/iroh/src/util/path.rs b/iroh/src/util/path.rs deleted file mode 100644 index bfeffe913a..0000000000 --- a/iroh/src/util/path.rs +++ /dev/null @@ -1,54 +0,0 @@ -//! Configuration paths for iroh. - -use std::path::{Path, PathBuf}; - -/// Paths to files or directories used by Iroh. -#[derive(Debug, Clone, Copy, Eq, PartialEq, strum::AsRefStr, strum::EnumString, strum::Display)] -#[cfg_attr(test, derive(strum::EnumIter))] -pub enum IrohPaths { - /// Path to the node's secret key for the [`iroh_net::key::PublicKey`]. - #[strum(serialize = "keypair")] - SecretKey, - #[strum(serialize = "peers.postcard")] - /// Path to store known peer data. - PeerData, - #[strum(serialize = "rpc.lock")] - /// Path to RPC lock file, containing the RPC port if running. - RpcLock, -} - -impl AsRef for IrohPaths { - fn as_ref(&self) -> &Path { - let s: &str = self.as_ref(); - Path::new(s) - } -} - -impl IrohPaths { - /// Get the path for this [`IrohPaths`] by joining the name to a root directory. - pub fn with_root(self, root: impl AsRef) -> PathBuf { - let path = root.as_ref().join(self); - path - } -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use strum::IntoEnumIterator; - - use super::*; - - #[test] - fn test_iroh_paths_parse_roundtrip() { - for iroh_path in IrohPaths::iter() { - println!("{iroh_path}"); - let root = PathBuf::from("/tmp"); - let path = root.join(iroh_path); - let fname = path.file_name().unwrap().to_str().unwrap(); - let parsed = IrohPaths::from_str(fname).unwrap(); - assert_eq!(iroh_path, parsed); - } - } -}