From e01c943919578fc4449f39e56928d295633e0892 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Fri, 22 Nov 2024 12:34:30 +0100 Subject: [PATCH 1/9] feat: netsim uses examples --- .github/workflows/ci.yml | 2 +- .github/workflows/netsim_runner.yaml | 3 +- iroh-net/bench/src/lib.rs | 2 +- iroh/examples/new.rs | 292 +++++++++++++++++++++++++++ 4 files changed, 296 insertions(+), 3 deletions(-) create mode 100644 iroh/examples/new.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c4df22395c..7d48e2b589 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -295,7 +295,7 @@ jobs: branch: ${{ github.ref }} max_workers: 4 netsim_branch: "main" - sim_paths: "sims/iroh/iroh.json,sims/integration" + sim_paths: "sims/iroh/iroh_v2.json,sims/integration_v2" pr_number: ${{ github.event.pull_request.number || '' }} codespell: diff --git a/.github/workflows/netsim_runner.yaml b/.github/workflows/netsim_runner.yaml index ae7b20d08c..7157a3c2c1 100644 --- a/.github/workflows/netsim_runner.yaml +++ b/.github/workflows/netsim_runner.yaml @@ -133,7 +133,8 @@ jobs: - name: Copy binaries to right location run: | cp target/${{inputs.build_profile}}/examples/* ../chuck/netsim/bins/ - cp target/${{inputs.build_profile}}/iroh ../chuck/netsim/bins/iroh + cp target/${{inputs.build_profile}}/examples/new ../chuck/netsim/bins/iroh-transfer + # cp target/${{inputs.build_profile}}/iroh ../chuck/netsim/bins/iroh cp target/${{inputs.build_profile}}/iroh-relay ../chuck/netsim/bins/iroh-relay cp ../chuck/target/release/chuck ../chuck/netsim/bins/chuck diff --git a/iroh-net/bench/src/lib.rs b/iroh-net/bench/src/lib.rs index a591581d26..93e0c91e51 100644 --- a/iroh-net/bench/src/lib.rs +++ b/iroh-net/bench/src/lib.rs @@ -21,7 +21,7 @@ pub mod s2n; pub mod stats; #[derive(Parser, Debug, Clone, Copy)] -#[clap(name = "bulk")] +#[clap(name = "iroh-net-bench")] pub enum Commands { Iroh(Opt), #[cfg(not(any(target_os = "freebsd", target_os = "openbsd", target_os = "netbsd")))] diff --git a/iroh/examples/new.rs b/iroh/examples/new.rs new file mode 100644 index 0000000000..8ed220a54e --- /dev/null +++ b/iroh/examples/new.rs @@ -0,0 +1,292 @@ +use anyhow::{Context,Result}; +use bytes::Bytes; +use clap::{Parser, Subcommand}; +use futures_lite::StreamExt; +use iroh_net::{key::SecretKey, ticket::NodeTicket, Endpoint, NodeAddr, RelayMode}; +use std::time::{Duration, Instant}; +use tracing::info; +use std::str::FromStr; + +// Transfer ALPN that we are using to communicate over the `Endpoint` +const TRANSFER_ALPN: &[u8] = b"n0/iroh/transfer/example/0"; + +#[derive(Parser, Debug)] +#[command(name = "provide_fetch")] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand, Debug)] +enum Commands { + Provide { + #[clap(long, default_value = "1G", value_parser = parse_byte_size)] + size: u64, + }, + Fetch { + #[arg(long)] + ticket: String, + }, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let cli = Cli::parse(); + + match &cli.command { + Commands::Provide { size } => provide(size.clone()).await?, + Commands::Fetch { ticket } => fetch(&ticket).await?, + } + + Ok(()) +} + +async fn provide(size: u64) -> anyhow::Result<()> { + let secret_key = SecretKey::generate(); + let endpoint = Endpoint::builder() + .secret_key(secret_key) + .alpns(vec![TRANSFER_ALPN.to_vec()]) + .relay_mode(RelayMode::Default) + .bind() + .await?; + + let node_id = endpoint.node_id(); + + for local_endpoint in endpoint + .direct_addresses() + .next() + .await + .context("no endpoints")? + { + println!("\t{}", local_endpoint.addr) + } + + + let relay_url = endpoint + .home_relay() + .expect("should be connected to a relay server"); + let local_addrs = endpoint + .direct_addresses() + .next() + .await + .context("no endpoints")? + .into_iter() + .map(|endpoint| { + let addr = endpoint.addr; + addr + }) + .collect::>(); + + let node_addr = NodeAddr::from_parts(node_id, Some(relay_url), local_addrs); + let ticket = NodeTicket::new(node_addr); + + println!("NodeTicket: {}", ticket); + + // accept incoming connections, returns a normal QUIC connection + while let Some(incoming) = endpoint.accept().await { + let mut connecting = match incoming.accept() { + Ok(connecting) => connecting, + Err(err) => { + tracing::warn!("incoming connection failed: {err:#}"); + // we can carry on in these cases: + // this can be caused by retransmitted datagrams + continue; + } + }; + let alpn = connecting.alpn().await?; + let conn = connecting.await?; + let node_id = iroh_net::endpoint::get_remote_node_id(&conn)?; + info!( + "new connection from {node_id} with ALPN {} (coming from {})", + String::from_utf8_lossy(&alpn), + conn.remote_address() + ); + + // spawn a task to handle reading and writing off of the connection + tokio::spawn(async move { + // accept a bi-directional QUIC connection + // use the `quinn` APIs to send and recv content + let (mut send, mut recv) = conn.accept_bi().await?; + tracing::debug!("accepted bi stream, waiting for data..."); + let message = recv.read_to_end(100).await?; + let message = String::from_utf8(message)?; + println!("received: {message}"); + + send_data_on_stream(&mut send, size).await?; + + // We sent the last message, so wait for the client to close the connection once + // it received this message. + let res = tokio::time::timeout(Duration::from_secs(3), async move { + let closed = conn.closed().await; + if !matches!(closed, quinn::ConnectionError::ApplicationClosed(_)) { + println!("node {node_id} disconnected with an error: {closed:#}"); + } + }) + .await; + if res.is_err() { + println!("node {node_id} did not disconnect within 3 seconds"); + } + Ok::<_, anyhow::Error>(()) + }); + } + + // stop with SIGINT (ctrl-c) + Ok(()) +} + +async fn fetch(ticket: &str) -> anyhow::Result<()> { + let ticket: NodeTicket = ticket.parse()?; + let secret_key = SecretKey::generate(); + let endpoint = Endpoint::builder() + .secret_key(secret_key) + .alpns(vec![TRANSFER_ALPN.to_vec()]) + .relay_mode(RelayMode::Default) + .bind() + .await?; + + let start = Instant::now(); + + let me = endpoint.node_id(); + println!("node id: {me}"); + println!("node listening addresses:"); + for local_endpoint in endpoint + .direct_addresses() + .next() + .await + .context("no endpoints")? + { + println!("\t{}", local_endpoint.addr) + } + + let relay_url = endpoint + .home_relay() + .expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server"); + println!("node relay server url: {relay_url}\n"); + + // Attempt to connect, over the given ALPN. + // Returns a Quinn connection. + let conn = endpoint.connect(ticket.node_addr().clone(), TRANSFER_ALPN).await?; + info!("connected"); + + // Use the Quinn API to send and recv content. + let (mut send, mut recv) = conn.open_bi().await?; + + let message = format!("{me} is saying 'hello!'"); + send.write_all(message.as_bytes()).await?; + + // Call `finish` to close the send side of the connection gracefully. + send.finish()?; + + let (len, dur, chnk) = drain_stream(&mut recv, false).await?; + + // We received the last message: close all connections and allow for the close + // message to be sent. + endpoint.close(0u8.into(), b"bye").await?; + + let duration = start.elapsed(); + println!("Received {} B in {:?}/{:?} in {} chunks", len, dur, duration, chnk); + println!("Transferred {} B in {} seconds, {} B/s", len, duration.as_secs_f64(), len as f64 / duration.as_secs_f64()); + + Ok(()) +} + +async fn drain_stream( + stream: &mut iroh_net::endpoint::RecvStream, + read_unordered: bool, +) -> Result<(usize, Duration, u64)> { + let mut read = 0; + + let download_start = Instant::now(); + let mut first_byte = true; + let mut ttfb = download_start.elapsed(); + + let mut num_chunks: u64 = 0; + + if read_unordered { + while let Some(chunk) = stream.read_chunk(usize::MAX, false).await? { + if first_byte { + ttfb = download_start.elapsed(); + first_byte = false; + } + read += chunk.bytes.len(); + num_chunks += 1; + } + } else { + // These are 32 buffers, for reading approximately 32kB at once + #[rustfmt::skip] + let mut bufs = [ + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + Bytes::new(), Bytes::new(), Bytes::new(), Bytes::new(), + ]; + + while let Some(n) = stream.read_chunks(&mut bufs[..]).await? { + if first_byte { + ttfb = download_start.elapsed(); + first_byte = false; + } + read += bufs.iter().take(n).map(|buf| buf.len()).sum::(); + num_chunks += 1; + } + } + + Ok((read, ttfb, num_chunks)) +} + +async fn send_data_on_stream(stream: &mut iroh_net::endpoint::SendStream, stream_size: u64) -> Result<()> { + const DATA: &[u8] = &[0xAB; 7*1024 * 1024]; + let bytes_data = Bytes::from_static(DATA); + + let full_chunks = stream_size / (DATA.len() as u64); + let remaining = (stream_size % (DATA.len() as u64)) as usize; + + for _ in 0..full_chunks { + stream + .write_chunk(bytes_data.clone()) + .await + .context("failed sending data")?; + } + + if remaining != 0 { + stream + .write_chunk(bytes_data.slice(0..remaining)) + .await + .context("failed sending data")?; + } + + stream.finish().context("failed finishing stream")?; + stream + .stopped() + .await + .context("failed to wait for stream to be stopped")?; + + Ok(()) +} + +fn parse_byte_size(s: &str) -> Result { + let s = s.trim(); + + let multiplier = match s.chars().last() { + Some('T') => 1024 * 1024 * 1024 * 1024, + Some('G') => 1024 * 1024 * 1024, + Some('M') => 1024 * 1024, + Some('k') => 1024, + _ => 1, + }; + + let s = if multiplier != 1 { + &s[..s.len() - 1] + } else { + s + }; + + let base: u64 = u64::from_str(s)?; + + Ok(base * multiplier) +} \ No newline at end of file From b8e30e432e55961a7f27015a4ee586b263ddc690 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Fri, 22 Nov 2024 12:36:15 +0100 Subject: [PATCH 2/9] bump --- .github/workflows/netsim_runner.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/netsim_runner.yaml b/.github/workflows/netsim_runner.yaml index 7157a3c2c1..37eb07a1e2 100644 --- a/.github/workflows/netsim_runner.yaml +++ b/.github/workflows/netsim_runner.yaml @@ -134,7 +134,6 @@ jobs: run: | cp target/${{inputs.build_profile}}/examples/* ../chuck/netsim/bins/ cp target/${{inputs.build_profile}}/examples/new ../chuck/netsim/bins/iroh-transfer - # cp target/${{inputs.build_profile}}/iroh ../chuck/netsim/bins/iroh cp target/${{inputs.build_profile}}/iroh-relay ../chuck/netsim/bins/iroh-relay cp ../chuck/target/release/chuck ../chuck/netsim/bins/chuck From 859d958540bdf779bfc80558edf609d8b04056fc Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Fri, 22 Nov 2024 14:09:57 +0100 Subject: [PATCH 3/9] custom relay config option and adjust sim paths --- .github/workflows/ci.yml | 2 +- .github/workflows/netsim.yml | 4 ++-- iroh/examples/new.rs | 34 +++++++++++++++++++++++++++------- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7d48e2b589..53888fa4a7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -295,7 +295,7 @@ jobs: branch: ${{ github.ref }} max_workers: 4 netsim_branch: "main" - sim_paths: "sims/iroh/iroh_v2.json,sims/integration_v2" + sim_paths: "sims/iroh_v2/iroh.json,sims/integration_v2" pr_number: ${{ github.event.pull_request.number || '' }} codespell: diff --git a/.github/workflows/netsim.yml b/.github/workflows/netsim.yml index b7a79cd1d1..cbd08149e4 100644 --- a/.github/workflows/netsim.yml +++ b/.github/workflows/netsim.yml @@ -39,7 +39,7 @@ jobs: branch: "main" max_workers: 1 netsim_branch: "main" - sim_paths: "sims/iroh,sims/integration" + sim_paths: "sims/iroh_v2,sims/integration_v2" pr_number: "" publish_metrics: true build_profile: "optimized-release" @@ -53,7 +53,7 @@ jobs: branch: ${{inputs.branch}} max_workers: 1 netsim_branch: ${{inputs.netsim_branch}} - sim_paths: "sims/iroh" + sim_paths: "sims/iroh_v2" pr_number: ${{inputs.pr_number}} publish_metrics: false build_profile: "optimized-release" diff --git a/iroh/examples/new.rs b/iroh/examples/new.rs index 8ed220a54e..ce60f944ae 100644 --- a/iroh/examples/new.rs +++ b/iroh/examples/new.rs @@ -2,7 +2,7 @@ use anyhow::{Context,Result}; use bytes::Bytes; use clap::{Parser, Subcommand}; use futures_lite::StreamExt; -use iroh_net::{key::SecretKey, ticket::NodeTicket, Endpoint, NodeAddr, RelayMode}; +use iroh_net::{key::SecretKey, ticket::NodeTicket, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl}; use std::time::{Duration, Instant}; use tracing::info; use std::str::FromStr; @@ -22,10 +22,14 @@ enum Commands { Provide { #[clap(long, default_value = "1G", value_parser = parse_byte_size)] size: u64, + #[clap(long)] + relay_url: Option, }, Fetch { #[arg(long)] ticket: String, + #[clap(long)] + relay_url: Option, }, } @@ -35,19 +39,27 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); match &cli.command { - Commands::Provide { size } => provide(size.clone()).await?, - Commands::Fetch { ticket } => fetch(&ticket).await?, + Commands::Provide { size, relay_url } => provide(size.clone(), relay_url.clone()).await?, + Commands::Fetch { ticket , relay_url} => fetch(&ticket, relay_url.clone()).await?, } Ok(()) } -async fn provide(size: u64) -> anyhow::Result<()> { +async fn provide(size: u64, relay_url: Option) -> anyhow::Result<()> { let secret_key = SecretKey::generate(); + let relay_mode = match relay_url { + Some(relay_url) => { + let relay_url = RelayUrl::from_str(&relay_url)?; + let relay_map = RelayMap::from_url(relay_url); + RelayMode::Custom(relay_map) + } + None => RelayMode::Default, + }; let endpoint = Endpoint::builder() .secret_key(secret_key) .alpns(vec![TRANSFER_ALPN.to_vec()]) - .relay_mode(RelayMode::Default) + .relay_mode(relay_mode) .bind() .await?; @@ -135,13 +147,21 @@ async fn provide(size: u64) -> anyhow::Result<()> { Ok(()) } -async fn fetch(ticket: &str) -> anyhow::Result<()> { +async fn fetch(ticket: &str, relay_url: Option) -> anyhow::Result<()> { let ticket: NodeTicket = ticket.parse()?; let secret_key = SecretKey::generate(); + let relay_mode = match relay_url { + Some(relay_url) => { + let relay_url = RelayUrl::from_str(&relay_url)?; + let relay_map = RelayMap::from_url(relay_url); + RelayMode::Custom(relay_map) + } + None => RelayMode::Default, + }; let endpoint = Endpoint::builder() .secret_key(secret_key) .alpns(vec![TRANSFER_ALPN.to_vec()]) - .relay_mode(RelayMode::Default) + .relay_mode(relay_mode) .bind() .await?; From 42c8cea4e6fd317dc8c33f5d40e54c177dc1bad9 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 25 Nov 2024 11:32:43 +0100 Subject: [PATCH 4/9] some cleanup --- .github/workflows/netsim_runner.yaml | 2 +- iroh/examples/{new.rs => transfer.rs} | 54 +++++++++++++++++---------- 2 files changed, 35 insertions(+), 21 deletions(-) rename iroh/examples/{new.rs => transfer.rs} (90%) diff --git a/.github/workflows/netsim_runner.yaml b/.github/workflows/netsim_runner.yaml index 37eb07a1e2..df168f2803 100644 --- a/.github/workflows/netsim_runner.yaml +++ b/.github/workflows/netsim_runner.yaml @@ -133,7 +133,7 @@ jobs: - name: Copy binaries to right location run: | cp target/${{inputs.build_profile}}/examples/* ../chuck/netsim/bins/ - cp target/${{inputs.build_profile}}/examples/new ../chuck/netsim/bins/iroh-transfer + cp target/${{inputs.build_profile}}/examples/transfer ../chuck/netsim/bins/iroh-transfer cp target/${{inputs.build_profile}}/iroh-relay ../chuck/netsim/bins/iroh-relay cp ../chuck/target/release/chuck ../chuck/netsim/bins/chuck diff --git a/iroh/examples/new.rs b/iroh/examples/transfer.rs similarity index 90% rename from iroh/examples/new.rs rename to iroh/examples/transfer.rs index ce60f944ae..1e0c35d89c 100644 --- a/iroh/examples/new.rs +++ b/iroh/examples/transfer.rs @@ -1,17 +1,22 @@ -use anyhow::{Context,Result}; +use std::{ + str::FromStr, + time::{Duration, Instant}, +}; + +use anyhow::{Context, Result}; use bytes::Bytes; use clap::{Parser, Subcommand}; use futures_lite::StreamExt; -use iroh_net::{key::SecretKey, ticket::NodeTicket, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl}; -use std::time::{Duration, Instant}; +use iroh_net::{ + key::SecretKey, ticket::NodeTicket, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, +}; use tracing::info; -use std::str::FromStr; // Transfer ALPN that we are using to communicate over the `Endpoint` const TRANSFER_ALPN: &[u8] = b"n0/iroh/transfer/example/0"; #[derive(Parser, Debug)] -#[command(name = "provide_fetch")] +#[command(name = "transfer")] struct Cli { #[command(subcommand)] command: Commands, @@ -26,7 +31,7 @@ enum Commands { relay_url: Option, }, Fetch { - #[arg(long)] + #[arg(index = 1)] ticket: String, #[clap(long)] relay_url: Option, @@ -39,8 +44,8 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); match &cli.command { - Commands::Provide { size, relay_url } => provide(size.clone(), relay_url.clone()).await?, - Commands::Fetch { ticket , relay_url} => fetch(&ticket, relay_url.clone()).await?, + Commands::Provide { size, relay_url } => provide(*size, relay_url.clone()).await?, + Commands::Fetch { ticket, relay_url } => fetch(ticket, relay_url.clone()).await?, } Ok(()) @@ -74,7 +79,6 @@ async fn provide(size: u64, relay_url: Option) -> anyhow::Result<()> { println!("\t{}", local_endpoint.addr) } - let relay_url = endpoint .home_relay() .expect("should be connected to a relay server"); @@ -84,10 +88,7 @@ async fn provide(size: u64, relay_url: Option) -> anyhow::Result<()> { .await .context("no endpoints")? .into_iter() - .map(|endpoint| { - let addr = endpoint.addr; - addr - }) + .map(|endpoint| endpoint.addr) .collect::>(); let node_addr = NodeAddr::from_parts(node_id, Some(relay_url), local_addrs); @@ -183,10 +184,12 @@ async fn fetch(ticket: &str, relay_url: Option) -> anyhow::Result<()> { .home_relay() .expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server"); println!("node relay server url: {relay_url}\n"); - + // Attempt to connect, over the given ALPN. // Returns a Quinn connection. - let conn = endpoint.connect(ticket.node_addr().clone(), TRANSFER_ALPN).await?; + let conn = endpoint + .connect(ticket.node_addr().clone(), TRANSFER_ALPN) + .await?; info!("connected"); // Use the Quinn API to send and recv content. @@ -205,8 +208,16 @@ async fn fetch(ticket: &str, relay_url: Option) -> anyhow::Result<()> { endpoint.close(0u8.into(), b"bye").await?; let duration = start.elapsed(); - println!("Received {} B in {:?}/{:?} in {} chunks", len, dur, duration, chnk); - println!("Transferred {} B in {} seconds, {} B/s", len, duration.as_secs_f64(), len as f64 / duration.as_secs_f64()); + println!( + "Received {} B in {:?}/{:?} in {} chunks", + len, dur, duration, chnk + ); + println!( + "Transferred {} B in {} seconds, {} B/s", + len, + duration.as_secs_f64(), + len as f64 / duration.as_secs_f64() + ); Ok(()) } @@ -259,8 +270,11 @@ async fn drain_stream( Ok((read, ttfb, num_chunks)) } -async fn send_data_on_stream(stream: &mut iroh_net::endpoint::SendStream, stream_size: u64) -> Result<()> { - const DATA: &[u8] = &[0xAB; 7*1024 * 1024]; +async fn send_data_on_stream( + stream: &mut iroh_net::endpoint::SendStream, + stream_size: u64, +) -> Result<()> { + const DATA: &[u8] = &[0xAB; 7 * 1024 * 1024]; let bytes_data = Bytes::from_static(DATA); let full_chunks = stream_size / (DATA.len() as u64); @@ -309,4 +323,4 @@ fn parse_byte_size(s: &str) -> Result { let base: u64 = u64::from_str(s)?; Ok(base * multiplier) -} \ No newline at end of file +} From a73f43a3697844d875455c8e1def0679b1fe5e90 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 25 Nov 2024 11:51:02 +0100 Subject: [PATCH 5/9] code cleanup --- .github/workflows/ci.yml | 6 +++--- Cargo.lock | 7 +++++++ iroh/Cargo.toml | 7 ++++++- iroh/examples/transfer.rs | 32 ++++++++------------------------ 4 files changed, 24 insertions(+), 28 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 53888fa4a7..16daebb147 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -246,13 +246,13 @@ jobs: # TODO: We have a bunch of platform-dependent code so should # probably run this job on the full platform matrix - name: clippy check (all features) - run: cargo clippy --workspace --all-features --all-targets --bins --tests --benches + run: cargo clippy --workspace --all-features --all-targets --bins --tests --benches --examples - name: clippy check (no features) - run: cargo clippy --workspace --no-default-features --lib --bins --tests + run: cargo clippy --workspace --no-default-features --lib --bins --tests --examples - name: clippy check (default features) - run: cargo clippy --workspace --all-targets + run: cargo clippy --workspace --all-targets --examples msrv: if: "github.event_name != 'pull_request' || ! contains(github.event.pull_request.labels.*.name, 'flaky-test')" diff --git a/Cargo.lock b/Cargo.lock index 409947558a..6a7e13a716 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2499,6 +2499,7 @@ dependencies = [ "nested_enum_utils", "num_cpus", "parking_lot", + "parse-size", "postcard", "proptest", "quic-rpc", @@ -3655,6 +3656,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parse-size" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487f2ccd1e17ce8c1bfab3a65c89525af41cfad4c8659021a1e9a2aacd73b89b" + [[package]] name = "paste" version = "1.0.15" diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 91b2350171..14a790323b 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -62,6 +62,7 @@ ref-cast = "1.0.23" # Examples clap = { version = "4", features = ["derive"], optional = true } indicatif = { version = "0.17", features = ["tokio"], optional = true } +parse-size = { version = "1.1.0", optional = true } # Documentation tests url = { version = "2.5.0", features = ["serde"] } @@ -74,7 +75,7 @@ test = [] discovery-pkarr-dht = ["iroh-net/discovery-pkarr-dht"] test-utils = ["iroh-net/test-utils"] -examples = ["dep:clap", "dep:indicatif"] +examples = ["dep:clap", "dep:indicatif", "dep:parse-size"] [dev-dependencies] anyhow = { version = "1" } @@ -101,3 +102,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"] [[example]] name = "rpc" required-features = ["examples"] + +[[example]] +name = "transfer" +required-features = ["examples"] diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 1e0c35d89c..957b584612 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -7,6 +7,7 @@ use anyhow::{Context, Result}; use bytes::Bytes; use clap::{Parser, Subcommand}; use futures_lite::StreamExt; +use indicatif::HumanBytes; use iroh_net::{ key::SecretKey, ticket::NodeTicket, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, }; @@ -213,10 +214,10 @@ async fn fetch(ticket: &str, relay_url: Option) -> anyhow::Result<()> { len, dur, duration, chnk ); println!( - "Transferred {} B in {} seconds, {} B/s", - len, + "Transferred {} in {:.4}, {}/s", + HumanBytes(len as u64), duration.as_secs_f64(), - len as f64 / duration.as_secs_f64() + HumanBytes((len as f64 / duration.as_secs_f64()) as u64) ); Ok(()) @@ -274,7 +275,7 @@ async fn send_data_on_stream( stream: &mut iroh_net::endpoint::SendStream, stream_size: u64, ) -> Result<()> { - const DATA: &[u8] = &[0xAB; 7 * 1024 * 1024]; + const DATA: &[u8] = &[0xAB; 1024 * 1024]; let bytes_data = Bytes::from_static(DATA); let full_chunks = stream_size / (DATA.len() as u64); @@ -303,24 +304,7 @@ async fn send_data_on_stream( Ok(()) } -fn parse_byte_size(s: &str) -> Result { - let s = s.trim(); - - let multiplier = match s.chars().last() { - Some('T') => 1024 * 1024 * 1024 * 1024, - Some('G') => 1024 * 1024 * 1024, - Some('M') => 1024 * 1024, - Some('k') => 1024, - _ => 1, - }; - - let s = if multiplier != 1 { - &s[..s.len() - 1] - } else { - s - }; - - let base: u64 = u64::from_str(s)?; - - Ok(base * multiplier) +fn parse_byte_size(s: &str) -> Result { + let cfg = parse_size::Config::new().with_binary(); + cfg.parse_size(s).map_err(Into::into) } From 4bee16505fc614fcf700642e569cccdd671d0056 Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 25 Nov 2024 12:11:22 +0100 Subject: [PATCH 6/9] nits --- iroh/examples/transfer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 957b584612..afb587b103 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -202,7 +202,7 @@ async fn fetch(ticket: &str, relay_url: Option) -> anyhow::Result<()> { // Call `finish` to close the send side of the connection gracefully. send.finish()?; - let (len, dur, chnk) = drain_stream(&mut recv, false).await?; + let (len, ttfb, chnk) = drain_stream(&mut recv, false).await?; // We received the last message: close all connections and allow for the close // message to be sent. @@ -210,8 +210,8 @@ async fn fetch(ticket: &str, relay_url: Option) -> anyhow::Result<()> { let duration = start.elapsed(); println!( - "Received {} B in {:?}/{:?} in {} chunks", - len, dur, duration, chnk + "Received {} B in {:.4}s with ttfb {}s in {} chunks", + HumanBytes(len as u64), duration.as_secs_f64(), ttfb.as_secs_f64(), chnk ); println!( "Transferred {} in {:.4}, {}/s", From c1a1270037852884e352dab769d5de8420bc00cf Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 25 Nov 2024 12:23:09 +0100 Subject: [PATCH 7/9] more cleanup --- Cargo.lock | 4 ++-- iroh/Cargo.toml | 2 +- iroh/examples/transfer.rs | 7 +++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a7e13a716..99275d046a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3658,9 +3658,9 @@ dependencies = [ [[package]] name = "parse-size" -version = "1.1.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "487f2ccd1e17ce8c1bfab3a65c89525af41cfad4c8659021a1e9a2aacd73b89b" +checksum = "944553dd59c802559559161f9816429058b869003836120e262e8caec061b7ae" [[package]] name = "paste" diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 14a790323b..392256c730 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -62,7 +62,7 @@ ref-cast = "1.0.23" # Examples clap = { version = "4", features = ["derive"], optional = true } indicatif = { version = "0.17", features = ["tokio"], optional = true } -parse-size = { version = "1.1.0", optional = true } +parse-size = { version = "=1.0.0", optional = true } # pinned version to avoid bumping msrv to 1.81 # Documentation tests url = { version = "2.5.0", features = ["serde"] } diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index afb587b103..877eacd561 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -211,7 +211,10 @@ async fn fetch(ticket: &str, relay_url: Option) -> anyhow::Result<()> { let duration = start.elapsed(); println!( "Received {} B in {:.4}s with ttfb {}s in {} chunks", - HumanBytes(len as u64), duration.as_secs_f64(), ttfb.as_secs_f64(), chnk + HumanBytes(len as u64), + duration.as_secs_f64(), + ttfb.as_secs_f64(), + chnk ); println!( "Transferred {} in {:.4}, {}/s", @@ -306,5 +309,5 @@ async fn send_data_on_stream( fn parse_byte_size(s: &str) -> Result { let cfg = parse_size::Config::new().with_binary(); - cfg.parse_size(s).map_err(Into::into) + cfg.parse_size(s).map_err(|e| anyhow::anyhow!(e)) } From f7f2c788b906758f410f1686b85558749c224d8c Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 25 Nov 2024 12:55:20 +0100 Subject: [PATCH 8/9] cover all targets --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 16daebb147..e96f386686 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -246,13 +246,13 @@ jobs: # TODO: We have a bunch of platform-dependent code so should # probably run this job on the full platform matrix - name: clippy check (all features) - run: cargo clippy --workspace --all-features --all-targets --bins --tests --benches --examples + run: cargo clippy --workspace --all-features --all-targets --lib --bins --tests --benches --examples - name: clippy check (no features) - run: cargo clippy --workspace --no-default-features --lib --bins --tests --examples + run: cargo clippy --workspace --no-default-features --all-targets --lib --bins --tests --benches --examples - name: clippy check (default features) - run: cargo clippy --workspace --all-targets --examples + run: cargo clippy --workspace --all-targets --lib --bins --tests --benches --examples msrv: if: "github.event_name != 'pull_request' || ! contains(github.event.pull_request.labels.*.name, 'flaky-test')" From 3a26d4606f756bbcec3819eac39d0f006c2cd6bf Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 25 Nov 2024 15:02:30 +0100 Subject: [PATCH 9/9] CRs --- iroh/examples/transfer.rs | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 877eacd561..9225155b21 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -99,7 +99,7 @@ async fn provide(size: u64, relay_url: Option) -> anyhow::Result<()> { // accept incoming connections, returns a normal QUIC connection while let Some(incoming) = endpoint.accept().await { - let mut connecting = match incoming.accept() { + let connecting = match incoming.accept() { Ok(connecting) => connecting, Err(err) => { tracing::warn!("incoming connection failed: {err:#}"); @@ -108,12 +108,11 @@ async fn provide(size: u64, relay_url: Option) -> anyhow::Result<()> { continue; } }; - let alpn = connecting.alpn().await?; let conn = connecting.await?; let node_id = iroh_net::endpoint::get_remote_node_id(&conn)?; info!( "new connection from {node_id} with ALPN {} (coming from {})", - String::from_utf8_lossy(&alpn), + String::from_utf8_lossy(&TRANSFER_ALPN), conn.remote_address() ); @@ -199,21 +198,33 @@ async fn fetch(ticket: &str, relay_url: Option) -> anyhow::Result<()> { let message = format!("{me} is saying 'hello!'"); send.write_all(message.as_bytes()).await?; - // Call `finish` to close the send side of the connection gracefully. + // Call `finish` to signal no more data will be sent on this stream. send.finish()?; - let (len, ttfb, chnk) = drain_stream(&mut recv, false).await?; + let (len, time_to_first_byte, chnk) = drain_stream(&mut recv, false).await?; // We received the last message: close all connections and allow for the close // message to be sent. endpoint.close(0u8.into(), b"bye").await?; + // Ensure the client has closed the connection + let res = tokio::time::timeout(Duration::from_secs(3), async move { + let closed = conn.closed().await; + if !matches!(closed, quinn::ConnectionError::LocallyClosed) { + println!("node disconnected with an error: {closed:#}"); + } + }) + .await; + if res.is_err() { + println!("node did not disconnect within 3 seconds"); + } + let duration = start.elapsed(); println!( - "Received {} B in {:.4}s with ttfb {}s in {} chunks", + "Received {} in {:.4}s with time to first byte {}s in {} chunks", HumanBytes(len as u64), duration.as_secs_f64(), - ttfb.as_secs_f64(), + time_to_first_byte.as_secs_f64(), chnk ); println!( @@ -234,14 +245,14 @@ async fn drain_stream( let download_start = Instant::now(); let mut first_byte = true; - let mut ttfb = download_start.elapsed(); + let mut time_to_first_byte = download_start.elapsed(); let mut num_chunks: u64 = 0; if read_unordered { while let Some(chunk) = stream.read_chunk(usize::MAX, false).await? { if first_byte { - ttfb = download_start.elapsed(); + time_to_first_byte = download_start.elapsed(); first_byte = false; } read += chunk.bytes.len(); @@ -263,7 +274,7 @@ async fn drain_stream( while let Some(n) = stream.read_chunks(&mut bufs[..]).await? { if first_byte { - ttfb = download_start.elapsed(); + time_to_first_byte = download_start.elapsed(); first_byte = false; } read += bufs.iter().take(n).map(|buf| buf.len()).sum::(); @@ -271,7 +282,7 @@ async fn drain_stream( } } - Ok((read, ttfb, num_chunks)) + Ok((read, time_to_first_byte, num_chunks)) } async fn send_data_on_stream(