Skip to content

Commit

Permalink
fix turn_client_udp.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 16, 2024
1 parent a36415a commit 3ebba0b
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 99 deletions.
4 changes: 3 additions & 1 deletion rtc-turn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ thiserror = "1.0.57"
env_logger = "0.11.3"
chrono = "0.4.35"
hex = "0.4.3"
clap = "4.5.2"
clap = { version = "4.5.3", features = ["derive"] }
criterion = "0.5.1"
crossbeam-channel = "0.5"
ctrlc = "3.4"

[features]
metrics = []
Expand Down
280 changes: 187 additions & 93 deletions rtc-turn/examples/turn_client_udp.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
use bytes::BytesMut;
use clap::Parser;
use log::trace;
use rtc_turn::client::*;
use shared::error::Result;
use shared::error::{Error, Result};
use shared::{Protocol, Transmit, TransportContext};
use std::io::{ErrorKind, Write};
use std::net::UdpSocket;
use std::str::FromStr;
use std::time::{Duration, Instant};

// RUST_LOG=trace cargo run --color=always --package turn --example turn_client_udp -- --host 0.0.0.0 --user user=pass --ping
// RUST_LOG=trace cargo run --color=always --package rtc-turn --example turn_client_udp -- --host 0.0.0.0 --user user=pass --ping

#[derive(Parser)]
#[command(name = "ICE Ping Pong")]
#[command(name = "TURN Client UDP")]
#[command(author = "Rusty Rain <y@ngr.tc>")]
#[command(version = "0.1.0")]
#[command(about = "An example of ICE", long_about = None)]
#[command(about = "An example of TURN Client UDP", long_about = None)]
struct Cli {
#[arg(short, long)]
controlling: bool,
#[arg(long, default_value_t = format!("127.0.0.1"))]
host: String,
#[arg(long, default_value_t = 3478)]
port: u16,
#[arg(long)]
user: String,
#[arg(long, default_value_t = format!("webrtc.rs"))]
realm: String,
#[arg(long)]
ping: bool,

#[arg(short, long)]
debug: bool,
Expand All @@ -20,128 +35,207 @@ struct Cli {
}

fn main() -> Result<()> {
env_logger::init();

let mut app = App::new("TURN Client UDP")
.version("0.1.0")
.author("Rain Liu <yliu@webrtc.rs>")
.about("An example of TURN Client UDP")
.setting(AppSettings::DeriveDisplayOrder)
.setting(AppSettings::SubcommandsNegateReqs)
.arg(
Arg::with_name("FULLHELP")
.help("Prints more detailed help information")
.long("fullhelp"),
)
.arg(
Arg::with_name("host")
.required_unless("FULLHELP")
.takes_value(true)
.long("host")
.help("TURN Server name."),
)
.arg(
Arg::with_name("user")
.required_unless("FULLHELP")
.takes_value(true)
.long("user")
.help("A pair of username and password (e.g. \"user=pass\")"),
)
.arg(
Arg::with_name("realm")
.default_value("webrtc.rs")
.takes_value(true)
.long("realm")
.help("Realm (defaults to \"webrtc.rs\")"),
)
.arg(
Arg::with_name("port")
.takes_value(true)
.default_value("3478")
.long("port")
.help("Listening port."),
)
.arg(
Arg::with_name("ping")
.long("ping")
.takes_value(false)
.help("Run ping test"),
);

let matches = app.clone().get_matches();

if matches.is_present("FULLHELP") {
app.print_long_help().unwrap();
std::process::exit(0);
let cli = Cli::parse();
if cli.debug {
let log_level = log::LevelFilter::from_str(&cli.log_level).unwrap();
env_logger::Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{}:{} [{}] {} - {}",
record.file().unwrap_or("unknown"),
record.line().unwrap_or(0),
record.level(),
chrono::Local::now().format("%H:%M:%S.%6f"),
record.args()
)
})
.filter(None, log_level)
.init();
}

let host = matches.value_of("host").unwrap();
let port = matches.value_of("port").unwrap();
let user = matches.value_of("user").unwrap();
let host = cli.host;
let port = cli.port;
let user = cli.user;
let cred: Vec<&str> = user.splitn(2, '=').collect();
let ping = matches.is_present("ping");
let realm = matches.value_of("realm").unwrap();
let _ping = cli.ping;
let realm = cli.realm;

// TURN client won't create a local listening socket by itself.
let conn = UdpSocket::bind("0.0.0.0:0").await?;
let socket = UdpSocket::bind("0.0.0.0:0")?;
let pinger = UdpSocket::bind("0.0.0.0:0")?;

let local_addr = socket.local_addr()?;
let peer_addr = pinger.local_addr()?;

let turn_server_addr = format!("{host}:{port}");

let cfg = ClientConfig {
stun_serv_addr: turn_server_addr.clone(),
turn_serv_addr: turn_server_addr,
local_addr,
protocol: Protocol::UDP,
username: cred[0].to_string(),
password: cred[1].to_string(),
realm: realm.to_string(),
software: String::new(),
rto_in_ms: 0,
conn: Arc::new(conn),
vnet: None,
};

let client = Client::new(cfg).await?;
let mut client = Client::new(cfg)?;

// Allocate a relay socket on the TURN server.
let allocate_tid = client.allocate()?;
let mut relayed_addr = None;
let mut create_permission_tid = None;
// Send BindingRequest to learn our external IP
//let binding_tid = client.send_binding_request()?;

let (stop_tx, stop_rx) = crossbeam_channel::bounded::<()>(1);
println!("Press Ctrl-C to stop");
std::thread::spawn(move || {
let mut stop_tx = Some(stop_tx);
ctrlc::set_handler(move || {
if let Some(stop_tx) = stop_tx.take() {
let _ = stop_tx.send(());
}
})
.expect("Error setting Ctrl-C handler");
});

let mut buf = vec![0u8; 2048];
loop {
match stop_rx.try_recv() {
Ok(_) => break,
Err(err) => {
if err.is_disconnected() {
break;
}
}
};

while let Some(transmit) = client.poll_transmit() {
socket.send_to(&transmit.message, transmit.transport.peer_addr)?;
trace!(
"socket.sent {} to {}",
transmit.message.len(),
transmit.transport.peer_addr
);
}

while let Some(event) = client.poll_event() {
match event {
Event::TransactionTimeout(_) => return Err(Error::ErrTimeout),
Event::BindingResponse(_, reflexive_addr) => {
println!("reflexive address {}", reflexive_addr);
}
Event::BindingError(_, err) => return Err(err),
Event::AllocateResponse(tid, addr) => {
println!("relayed address {}", addr);
if relayed_addr.is_none() {
assert_eq!(tid, allocate_tid);
relayed_addr = Some(addr);
if let Some(id) = client.relay(addr)?.create_permission(peer_addr)? {
create_permission_tid = Some(id);
} else {
assert!(false, "create_permission failed");
}
} else {
assert!(false, "relayed address is not none");
}
}
Event::AllocateError(_, err) => return Err(err),
Event::CreatePermissionResponse(tid, peer_addr) => {
println!("CreatePermission for peer addr {} is granted", peer_addr);
if let Some(id) = create_permission_tid {
assert_eq!(tid, id);
} else {
assert!(false, "create_permission_tid is none");
}
}
Event::CreatePermissionError(_, err) => return Err(err),
Event::DataIndicationOrChannelData(_, _, _) => {}
}
}

// Start listening on the conn provided.
client.listen().await?;
let mut eto = Instant::now() + Duration::from_millis(100);
if let Some(to) = client.poll_timout() {
if to < eto {
eto = to;
}
}

// Allocate a relay socket on the TURN server. On success, it
// will return a net.PacketConn which represents the remote
// socket.
let relay_conn = client.allocate().await?;
let delay_from_now = eto
.checked_duration_since(Instant::now())
.unwrap_or(Duration::from_secs(0));
if delay_from_now.is_zero() {
client.handle_timeout(Instant::now());
continue;
}

// The relayConn's local address is actually the transport
// address assigned on the TURN server.
println!("relayed-address={}", relay_conn.local_addr()?);
socket
.set_read_timeout(Some(delay_from_now))
.expect("setting socket read timeout");

if let Some(transmit) = read_socket_input(&socket, &mut buf) {
trace!(
"read_socket_input {} from {}",
transmit.message.len(),
transmit.transport.peer_addr
);
client.handle_transmit(transmit)?;
}

// If you provided `-ping`, perform a ping test agaist the
// relayConn we have just allocated.
if ping {
do_ping_test(&client, relay_conn).await?;
// Drive time forward in all clients.
client.handle_timeout(Instant::now());
}

client.close().await?;
client.close();

Ok(())
}

async fn do_ping_test(
client: &Client,
relay_conn: impl Conn + std::marker::Send + std::marker::Sync + 'static,
) -> Result<(), Error> {
// Send BindingRequest to learn our external IP
let mapped_addr = client.send_binding_request().await?;
fn read_socket_input(socket: &UdpSocket, buf: &mut [u8]) -> Option<Transmit<BytesMut>> {
match socket.recv_from(buf) {
Ok((n, peer_addr)) => {
return Some(Transmit {
now: Instant::now(),
transport: TransportContext {
local_addr: socket.local_addr().unwrap(),
peer_addr,
protocol: Protocol::UDP,
ecn: None,
},
message: BytesMut::from(&buf[..n]),
});
}

Err(e) => match e.kind() {
// Expected error for set_read_timeout(). One for windows, one for the rest.
ErrorKind::WouldBlock | ErrorKind::TimedOut => None,
_ => panic!("UdpSocket read failed: {e:?}"),
},
}
}

/*
fn do_ping_test(
client: &mut Client,
relayed_addr: RelayedAddr,
//reflexive_addr: ReflexiveAddr,
) -> Result<()> {
// Set up pinger socket (pingerConn)
//println!("bind...");
let pinger_conn_tx = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
// Punch a UDP hole for the relay_conn by sending a data to the mapped_addr.
// This will trigger a TURN client to generate a permission request to the
// TURN server. After this, packets from the IP address will be accepted by
// the TURN server.
//println!("relay_conn send hello to mapped_addr {}", mapped_addr);
relay_conn.send_to("Hello".as_bytes(), mapped_addr).await?;
let relay_addr = relay_conn.local_addr()?;
/*client
.relay(relayed_addr)
.send_to("Hello".as_bytes(), reflexive_addr)?;
*/
let pinger_conn_rx = Arc::clone(&pinger_conn_tx);
Expand Down Expand Up @@ -195,12 +289,12 @@ async fn do_ping_test(
for _ in 0..2 {
let msg = "12345678910".to_owned(); //format!("{:?}", tokio::time::Instant::now());
println!("sending msg={} with size={}", msg, msg.as_bytes().len());
pinger_conn_tx.send_to(msg.as_bytes(), relay_addr).await?;
pinger_conn_tx.send_to(msg.as_bytes(), relayed_addr).await?;
// For simplicity, this example does not wait for the pong (reply).
// Instead, sleep 1 second.
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(())
}
}*/
Loading

0 comments on commit 3ebba0b

Please sign in to comment.