From 422cd4644e1c2bb812d5a2ba7d5711583e612392 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 14 Sep 2023 10:51:32 +1000 Subject: [PATCH] Reduce hole-punch-tests to clients --- Cargo.lock | 1 - hole-punching-tests/Cargo.toml | 3 +- hole-punching-tests/router/Dockerfile | 9 -- hole-punching-tests/router/run.sh | 14 --- hole-punching-tests/src/main.rs | 173 +++++++++----------------- 5 files changed, 61 insertions(+), 139 deletions(-) delete mode 100644 hole-punching-tests/router/Dockerfile delete mode 100644 hole-punching-tests/router/run.sh diff --git a/Cargo.lock b/Cargo.lock index e81f94b7b8c..77d0ba286a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2122,7 +2122,6 @@ name = "hole-punching-tests" version = "0.1.0" dependencies = [ "anyhow", - "clap", "env_logger 0.10.0", "futures", "libp2p", diff --git a/hole-punching-tests/Cargo.toml b/hole-punching-tests/Cargo.toml index 5d68b744c40..1917b004497 100644 --- a/hole-punching-tests/Cargo.toml +++ b/hole-punching-tests/Cargo.toml @@ -6,11 +6,10 @@ publish = false license = "MIT" [dependencies] -clap = { version = "4.3.8", features = ["derive", "env"] } +anyhow = "1" env_logger = "0.10.0" futures = "0.3.28" libp2p = { path = "../libp2p", features = ["tokio", "dcutr", "identify", "macros", "noise", "ping", "relay", "tcp", "yamux", "quic"] } log = "0.4" redis = { version = "0.23.0", default-features = false, features = ["tokio-comp"] } tokio = { version = "1.29.1", features = ["full"] } -anyhow = "1" diff --git a/hole-punching-tests/router/Dockerfile b/hole-punching-tests/router/Dockerfile deleted file mode 100644 index aaf61fdab7f..00000000000 --- a/hole-punching-tests/router/Dockerfile +++ /dev/null @@ -1,9 +0,0 @@ -FROM debian:12-slim - -ARG DEBIAN_FRONTEND=noninteractive -RUN --mount=type=cache,target=/var/cache/apt apt-get update && apt-get -y install iproute2 nftables jq tcpdump - -COPY *.sh /scripts/ -RUN chmod +x /scripts/*.sh - -ENTRYPOINT ["./scripts/run.sh"] diff --git a/hole-punching-tests/router/run.sh b/hole-punching-tests/router/run.sh deleted file mode 100644 index d25c6610803..00000000000 --- a/hole-punching-tests/router/run.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/sh - -set -e - -ADDR_EXTERNAL=$(ip -json addr show eth1 | jq '.[0].addr_info[0].local' -r) -SUBNET_INTERNAL=$(ip -json addr show eth0 | jq '.[0].addr_info[0].local + "/" + (.[0].addr_info[0].prefixlen | tostring)' -r) - -nft add table ip nat -nft add chain ip nat postrouting { type nat hook postrouting priority 100 \; } -nft add rule ip nat postrouting ip saddr $SUBNET_INTERNAL oifname "eth1" snat $ADDR_EXTERNAL - -tc qdisc add dev eth1 root netem delay 50ms - -tcpdump -i eth1 diff --git a/hole-punching-tests/src/main.rs b/hole-punching-tests/src/main.rs index cc685d563f4..2af5505e5ba 100644 --- a/hole-punching-tests/src/main.rs +++ b/hole-punching-tests/src/main.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. use anyhow::{Context, Result}; -use clap::{Parser, Subcommand}; use futures::{future::Either, stream::StreamExt}; use libp2p::{ core::{ @@ -34,34 +33,10 @@ use libp2p::{ }; use redis::AsyncCommands; use std::collections::HashMap; +use std::io; use std::net::{IpAddr, Ipv4Addr}; use std::str::FromStr; -#[derive(Debug, Parser)] -struct Opts { - #[command(subcommand)] - command: Command, -} - -#[derive(Debug, Subcommand)] -enum Command { - Relay { - /// Which local address to listen on. - #[clap(env)] - listen_addr: IpAddr, - }, - Dial { - /// The transport (tcp or quic). - #[clap(env)] - transport: TransportProtocol, - }, - Listen { - /// The transport (tcp or quic). - #[clap(env)] - transport: TransportProtocol, - }, -} - /// The redis key we push the relay's TCP listen address to. const RELAY_TCP_ADDRESS: &str = "RELAY_TCP_ADDRESS"; /// The redis key we push the relay's QUIC listen address to. @@ -76,51 +51,19 @@ async fn main() -> Result<()> { .parse_default_env() .init(); - let opts = Opts::parse(); + let mode = get_env::("MODE")?; + let transport = get_env::("TRANSPORT")?; let mut redis = RedisClient::new("redis", 6379).await?; - match opts.command { - Command::Relay { listen_addr } => { - let mut swarm = make_relay_swarm()?; - - let tcp_listener_id = swarm.listen_on(tcp_addr(listen_addr))?; - let quic_listener_id = swarm.listen_on(quic_addr(listen_addr))?; - - loop { - if let SwarmEvent::NewListenAddr { - address, - listener_id, - } = swarm.next().await.expect("Infinite Stream.") - { - swarm.add_external_address(address.clone()); // We know that in our testing network setup, that we are listening on a "publicly-reachable" address. - - log::info!("Listening on {address}"); - - let address = address - .with(Protocol::P2p(*swarm.local_peer_id())) - .to_string(); - - // Push each address twice because we need to connect two clients. - - if listener_id == tcp_listener_id { - redis.push(RELAY_TCP_ADDRESS, &address).await?; - redis.push(RELAY_TCP_ADDRESS, &address).await?; - } - if listener_id == quic_listener_id { - redis.push(RELAY_QUIC_ADDRESS, &address).await?; - redis.push(RELAY_QUIC_ADDRESS, &address).await?; - } - } - } - } - Command::Dial { transport } => { + match mode { + Mode::Dial => { let relay_addr = match transport { TransportProtocol::Tcp => redis.pop::(RELAY_TCP_ADDRESS).await?, - TransportProtocol::Quic => redis.pop::(RELAY_TCP_ADDRESS).await?, + TransportProtocol::Quic => redis.pop::(RELAY_QUIC_ADDRESS).await?, }; - let mut swarm = make_client_swarm()?; + let mut swarm = make_swarm()?; client_listen_on_transport(&mut swarm, transport).await?; client_connect_to_relay(&mut swarm, relay_addr.clone()).await?; @@ -134,13 +77,13 @@ async fn main() -> Result<()> { loop { match swarm.next().await.unwrap() { - SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr( + SwarmEvent::Behaviour(BehaviourEvent::Dcutr( dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id }, )) => { log::info!("Successfully hole-punched to {remote_peer_id}"); return Ok(()); } - SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr( + SwarmEvent::Behaviour(BehaviourEvent::Dcutr( dcutr::Event::DirectConnectionUpgradeFailed { remote_peer_id, error, @@ -156,13 +99,13 @@ async fn main() -> Result<()> { } } } - Command::Listen { transport } => { + Mode::Listen => { let relay_addr = match transport { TransportProtocol::Tcp => redis.pop::(RELAY_TCP_ADDRESS).await?, - TransportProtocol::Quic => redis.pop::(RELAY_TCP_ADDRESS).await?, + TransportProtocol::Quic => redis.pop::(RELAY_QUIC_ADDRESS).await?, }; - let mut swarm = make_client_swarm()?; + let mut swarm = make_swarm()?; client_listen_on_transport(&mut swarm, transport).await?; client_connect_to_relay(&mut swarm, relay_addr.clone()).await?; @@ -170,7 +113,7 @@ async fn main() -> Result<()> { loop { match swarm.next().await.unwrap() { - SwarmEvent::Behaviour(ClientBehaviourEvent::RelayClient( + SwarmEvent::Behaviour(BehaviourEvent::RelayClient( relay::client::Event::ReservationReqAccepted { .. }, )) => { log::info!("Relay accepted our reservation request."); @@ -179,13 +122,13 @@ async fn main() -> Result<()> { .push(LISTEN_CLIENT_PEER_ID, swarm.local_peer_id()) .await?; } - SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr( + SwarmEvent::Behaviour(BehaviourEvent::Dcutr( dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id }, )) => { log::info!("Successfully hole-punched to {remote_peer_id}"); return Ok(()); } - SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr( + SwarmEvent::Behaviour(BehaviourEvent::Dcutr( dcutr::Event::DirectConnectionUpgradeFailed { remote_peer_id, error, @@ -204,15 +147,28 @@ async fn main() -> Result<()> { } } +fn get_env(key: &'static str) -> Result +where + T: FromStr, + T::Err: std::error::Error + Send + Sync + 'static, +{ + let val = std::env::var(key) + .with_context(|| format!("Missing env var `{key}`"))? + .parse() + .with_context(|| format!("Failed to parse `{key}`)"))?; + + Ok(val) +} + async fn client_connect_to_relay( - swarm: &mut Swarm, + swarm: &mut Swarm, relay_addr: Multiaddr, ) -> Result<()> { // Connect to the relay server. swarm.dial(relay_addr.clone())?; loop { - if let SwarmEvent::Behaviour(ClientBehaviourEvent::Identify(identify::Event::Received { + if let SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { info: identify::Info { observed_addr, .. }, .. })) = swarm.next().await.unwrap() @@ -228,7 +184,7 @@ async fn client_connect_to_relay( } async fn client_listen_on_transport( - swarm: &mut Swarm, + swarm: &mut Swarm, transport: TransportProtocol, ) -> Result<()> { let listen_addr = match transport { @@ -269,7 +225,7 @@ fn quic_addr(addr: IpAddr) -> Multiaddr { .with(Protocol::QuicV1) } -fn make_client_swarm() -> Result> { +fn make_swarm() -> Result> { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); log::info!("Local peer id: {local_peer_id}"); @@ -294,7 +250,7 @@ fn make_client_swarm() -> Result> { .boxed() }; - let behaviour = ClientBehaviour { + let behaviour = Behaviour { relay_client: client, identify: identify::Behaviour::new(identify::Config::new( "/hole-punch-tests/1".to_owned(), @@ -306,32 +262,6 @@ fn make_client_swarm() -> Result> { Ok(SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build()) } -fn make_relay_swarm() -> Result> { - let local_key = identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - log::info!("Local peer id: {local_peer_id}"); - - let transport = tcp::tokio::Transport::new(tcp::Config::default().port_reuse(true)) - .upgrade(upgrade::Version::V1) - .authenticate(noise::Config::new(&local_key)?) - .multiplex(yamux::Config::default()) - .or_transport(quic::tokio::Transport::new(quic::Config::new(&local_key))) - .map(|either_output, _| match either_output { - Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - }) - .boxed(); - let behaviour = RelayBehaviour { - relay: relay::Behaviour::new(local_peer_id, relay::Config::default()), - identify: identify::Behaviour::new(identify::Config::new( - "/hole-punch-tests/1".to_owned(), - local_key.public(), - )), - }; - - Ok(SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build()) -} - struct RedisClient { inner: redis::aio::Connection, } @@ -371,32 +301,49 @@ impl RedisClient { } } -#[derive(Clone, Copy, Debug, PartialEq, Parser)] +#[derive(Clone, Copy, Debug, PartialEq)] enum TransportProtocol { Tcp, Quic, } impl FromStr for TransportProtocol { - type Err = String; + type Err = io::Error; fn from_str(mode: &str) -> Result { match mode { "tcp" => Ok(TransportProtocol::Tcp), "quic" => Ok(TransportProtocol::Quic), - _ => Err("Expected either 'tcp' or 'quic'".to_string()), + _ => Err(io::Error::new( + io::ErrorKind::Other, + "Expected either 'tcp' or 'quic'", + )), } } } -#[derive(NetworkBehaviour)] -struct ClientBehaviour { - relay_client: relay::client::Behaviour, - identify: identify::Behaviour, - dcutr: dcutr::Behaviour, +#[derive(Clone, Copy, Debug, PartialEq)] +enum Mode { + Dial, + Listen, +} + +impl FromStr for Mode { + type Err = io::Error; + fn from_str(mode: &str) -> Result { + match mode { + "dial" => Ok(Mode::Dial), + "listen" => Ok(Mode::Listen), + _ => Err(io::Error::new( + io::ErrorKind::Other, + "Expected either 'dial' or 'listen'", + )), + } + } } #[derive(NetworkBehaviour)] -struct RelayBehaviour { - relay: relay::Behaviour, +struct Behaviour { + relay_client: relay::client::Behaviour, identify: identify::Behaviour, + dcutr: dcutr::Behaviour, }