Skip to content

Commit

Permalink
Reduce hole-punch-tests to clients
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Sep 14, 2023
1 parent 87bd67b commit 422cd46
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 139 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions hole-punching-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ publish = false
license = "MIT"

[dependencies]
clap = { version = "4.3.8", features = ["derive", "env"] }
anyhow = "1"
env_logger = "0.10.0"
futures = "0.3.28"
libp2p = { path = "../libp2p", features = ["tokio", "dcutr", "identify", "macros", "noise", "ping", "relay", "tcp", "yamux", "quic"] }
log = "0.4"
redis = { version = "0.23.0", default-features = false, features = ["tokio-comp"] }
tokio = { version = "1.29.1", features = ["full"] }
anyhow = "1"
9 changes: 0 additions & 9 deletions hole-punching-tests/router/Dockerfile

This file was deleted.

14 changes: 0 additions & 14 deletions hole-punching-tests/router/run.sh

This file was deleted.

173 changes: 60 additions & 113 deletions hole-punching-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
// DEALINGS IN THE SOFTWARE.

use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use futures::{future::Either, stream::StreamExt};
use libp2p::{
core::{
Expand All @@ -34,34 +33,10 @@ use libp2p::{
};
use redis::AsyncCommands;
use std::collections::HashMap;
use std::io;
use std::net::{IpAddr, Ipv4Addr};
use std::str::FromStr;

#[derive(Debug, Parser)]
struct Opts {
#[command(subcommand)]
command: Command,
}

#[derive(Debug, Subcommand)]
enum Command {
Relay {
/// Which local address to listen on.
#[clap(env)]
listen_addr: IpAddr,
},
Dial {
/// The transport (tcp or quic).
#[clap(env)]
transport: TransportProtocol,
},
Listen {
/// The transport (tcp or quic).
#[clap(env)]
transport: TransportProtocol,
},
}

/// The redis key we push the relay's TCP listen address to.
const RELAY_TCP_ADDRESS: &str = "RELAY_TCP_ADDRESS";
/// The redis key we push the relay's QUIC listen address to.
Expand All @@ -76,51 +51,19 @@ async fn main() -> Result<()> {
.parse_default_env()
.init();

let opts = Opts::parse();
let mode = get_env::<Mode>("MODE")?;
let transport = get_env::<TransportProtocol>("TRANSPORT")?;

let mut redis = RedisClient::new("redis", 6379).await?;

match opts.command {
Command::Relay { listen_addr } => {
let mut swarm = make_relay_swarm()?;

let tcp_listener_id = swarm.listen_on(tcp_addr(listen_addr))?;
let quic_listener_id = swarm.listen_on(quic_addr(listen_addr))?;

loop {
if let SwarmEvent::NewListenAddr {
address,
listener_id,
} = swarm.next().await.expect("Infinite Stream.")
{
swarm.add_external_address(address.clone()); // We know that in our testing network setup, that we are listening on a "publicly-reachable" address.

log::info!("Listening on {address}");

let address = address
.with(Protocol::P2p(*swarm.local_peer_id()))
.to_string();

// Push each address twice because we need to connect two clients.

if listener_id == tcp_listener_id {
redis.push(RELAY_TCP_ADDRESS, &address).await?;
redis.push(RELAY_TCP_ADDRESS, &address).await?;
}
if listener_id == quic_listener_id {
redis.push(RELAY_QUIC_ADDRESS, &address).await?;
redis.push(RELAY_QUIC_ADDRESS, &address).await?;
}
}
}
}
Command::Dial { transport } => {
match mode {
Mode::Dial => {
let relay_addr = match transport {
TransportProtocol::Tcp => redis.pop::<Multiaddr>(RELAY_TCP_ADDRESS).await?,
TransportProtocol::Quic => redis.pop::<Multiaddr>(RELAY_TCP_ADDRESS).await?,
TransportProtocol::Quic => redis.pop::<Multiaddr>(RELAY_QUIC_ADDRESS).await?,
};

let mut swarm = make_client_swarm()?;
let mut swarm = make_swarm()?;
client_listen_on_transport(&mut swarm, transport).await?;
client_connect_to_relay(&mut swarm, relay_addr.clone()).await?;

Expand All @@ -134,13 +77,13 @@ async fn main() -> Result<()> {

loop {
match swarm.next().await.unwrap() {
SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr(
SwarmEvent::Behaviour(BehaviourEvent::Dcutr(
dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id },
)) => {
log::info!("Successfully hole-punched to {remote_peer_id}");
return Ok(());
}
SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr(
SwarmEvent::Behaviour(BehaviourEvent::Dcutr(
dcutr::Event::DirectConnectionUpgradeFailed {
remote_peer_id,
error,
Expand All @@ -156,21 +99,21 @@ async fn main() -> Result<()> {
}
}
}
Command::Listen { transport } => {
Mode::Listen => {
let relay_addr = match transport {
TransportProtocol::Tcp => redis.pop::<Multiaddr>(RELAY_TCP_ADDRESS).await?,
TransportProtocol::Quic => redis.pop::<Multiaddr>(RELAY_TCP_ADDRESS).await?,
TransportProtocol::Quic => redis.pop::<Multiaddr>(RELAY_QUIC_ADDRESS).await?,
};

let mut swarm = make_client_swarm()?;
let mut swarm = make_swarm()?;
client_listen_on_transport(&mut swarm, transport).await?;
client_connect_to_relay(&mut swarm, relay_addr.clone()).await?;

swarm.listen_on(relay_addr.with(Protocol::P2pCircuit))?;

loop {
match swarm.next().await.unwrap() {
SwarmEvent::Behaviour(ClientBehaviourEvent::RelayClient(
SwarmEvent::Behaviour(BehaviourEvent::RelayClient(
relay::client::Event::ReservationReqAccepted { .. },
)) => {
log::info!("Relay accepted our reservation request.");
Expand All @@ -179,13 +122,13 @@ async fn main() -> Result<()> {
.push(LISTEN_CLIENT_PEER_ID, swarm.local_peer_id())
.await?;
}
SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr(
SwarmEvent::Behaviour(BehaviourEvent::Dcutr(
dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id },
)) => {
log::info!("Successfully hole-punched to {remote_peer_id}");
return Ok(());
}
SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr(
SwarmEvent::Behaviour(BehaviourEvent::Dcutr(
dcutr::Event::DirectConnectionUpgradeFailed {
remote_peer_id,
error,
Expand All @@ -204,15 +147,28 @@ async fn main() -> Result<()> {
}
}

fn get_env<T>(key: &'static str) -> Result<T>
where
T: FromStr,
T::Err: std::error::Error + Send + Sync + 'static,
{
let val = std::env::var(key)
.with_context(|| format!("Missing env var `{key}`"))?
.parse()
.with_context(|| format!("Failed to parse `{key}`)"))?;

Ok(val)
}

async fn client_connect_to_relay(
swarm: &mut Swarm<ClientBehaviour>,
swarm: &mut Swarm<Behaviour>,
relay_addr: Multiaddr,
) -> Result<()> {
// Connect to the relay server.
swarm.dial(relay_addr.clone())?;

loop {
if let SwarmEvent::Behaviour(ClientBehaviourEvent::Identify(identify::Event::Received {
if let SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received {
info: identify::Info { observed_addr, .. },
..
})) = swarm.next().await.unwrap()
Expand All @@ -228,7 +184,7 @@ async fn client_connect_to_relay(
}

async fn client_listen_on_transport(
swarm: &mut Swarm<ClientBehaviour>,
swarm: &mut Swarm<Behaviour>,
transport: TransportProtocol,
) -> Result<()> {
let listen_addr = match transport {
Expand Down Expand Up @@ -269,7 +225,7 @@ fn quic_addr(addr: IpAddr) -> Multiaddr {
.with(Protocol::QuicV1)
}

fn make_client_swarm() -> Result<Swarm<ClientBehaviour>> {
fn make_swarm() -> Result<Swarm<Behaviour>> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
log::info!("Local peer id: {local_peer_id}");
Expand All @@ -294,7 +250,7 @@ fn make_client_swarm() -> Result<Swarm<ClientBehaviour>> {
.boxed()
};

let behaviour = ClientBehaviour {
let behaviour = Behaviour {
relay_client: client,
identify: identify::Behaviour::new(identify::Config::new(
"/hole-punch-tests/1".to_owned(),
Expand All @@ -306,32 +262,6 @@ fn make_client_swarm() -> Result<Swarm<ClientBehaviour>> {
Ok(SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build())
}

fn make_relay_swarm() -> Result<Swarm<RelayBehaviour>> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
log::info!("Local peer id: {local_peer_id}");

let transport = tcp::tokio::Transport::new(tcp::Config::default().port_reuse(true))
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&local_key)?)
.multiplex(yamux::Config::default())
.or_transport(quic::tokio::Transport::new(quic::Config::new(&local_key)))
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed();
let behaviour = RelayBehaviour {
relay: relay::Behaviour::new(local_peer_id, relay::Config::default()),
identify: identify::Behaviour::new(identify::Config::new(
"/hole-punch-tests/1".to_owned(),
local_key.public(),
)),
};

Ok(SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build())
}

struct RedisClient {
inner: redis::aio::Connection,
}
Expand Down Expand Up @@ -371,32 +301,49 @@ impl RedisClient {
}
}

#[derive(Clone, Copy, Debug, PartialEq, Parser)]
#[derive(Clone, Copy, Debug, PartialEq)]
enum TransportProtocol {
Tcp,
Quic,
}

impl FromStr for TransportProtocol {
type Err = String;
type Err = io::Error;
fn from_str(mode: &str) -> Result<Self, Self::Err> {
match mode {
"tcp" => Ok(TransportProtocol::Tcp),
"quic" => Ok(TransportProtocol::Quic),
_ => Err("Expected either 'tcp' or 'quic'".to_string()),
_ => Err(io::Error::new(
io::ErrorKind::Other,
"Expected either 'tcp' or 'quic'",
)),
}
}
}

#[derive(NetworkBehaviour)]
struct ClientBehaviour {
relay_client: relay::client::Behaviour,
identify: identify::Behaviour,
dcutr: dcutr::Behaviour,
#[derive(Clone, Copy, Debug, PartialEq)]
enum Mode {
Dial,
Listen,
}

impl FromStr for Mode {
type Err = io::Error;
fn from_str(mode: &str) -> Result<Self, Self::Err> {
match mode {
"dial" => Ok(Mode::Dial),
"listen" => Ok(Mode::Listen),
_ => Err(io::Error::new(
io::ErrorKind::Other,
"Expected either 'dial' or 'listen'",
)),
}
}
}

#[derive(NetworkBehaviour)]
struct RelayBehaviour {
relay: relay::Behaviour,
struct Behaviour {
relay_client: relay::client::Behaviour,
identify: identify::Behaviour,
dcutr: dcutr::Behaviour,
}

0 comments on commit 422cd46

Please sign in to comment.