diff --git a/rtc-turn/Cargo.toml b/rtc-turn/Cargo.toml index 19c18c7..00e10ee 100644 --- a/rtc-turn/Cargo.toml +++ b/rtc-turn/Cargo.toml @@ -24,8 +24,10 @@ thiserror = "1.0.57" env_logger = "0.11.3" chrono = "0.4.35" hex = "0.4.3" -clap = "4.5.2" +clap = { version = "4.5.3", features = ["derive"] } criterion = "0.5.1" +crossbeam-channel = "0.5" +ctrlc = "3.4" [features] metrics = [] diff --git a/rtc-turn/examples/turn_client_udp.rs b/rtc-turn/examples/turn_client_udp.rs index 6023259..320dc0e 100644 --- a/rtc-turn/examples/turn_client_udp.rs +++ b/rtc-turn/examples/turn_client_udp.rs @@ -1,17 +1,33 @@ +use bytes::BytesMut; use clap::Parser; +use log::trace; use rtc_turn::client::*; -use shared::error::Result; +use shared::error::{Error, Result}; +use shared::{Protocol, Transmit, TransportContext}; +use std::io::{ErrorKind, Write}; +use std::net::UdpSocket; +use std::str::FromStr; +use std::thread; +use std::time::{Duration, Instant}; -// RUST_LOG=trace cargo run --color=always --package turn --example turn_client_udp -- --host 0.0.0.0 --user user=pass --ping +// RUST_LOG=trace cargo run --color=always --package rtc-turn --example turn_client_udp -- --host 0.0.0.0 --user user=pass --ping #[derive(Parser)] -#[command(name = "ICE Ping Pong")] +#[command(name = "TURN Client UDP")] #[command(author = "Rusty Rain ")] #[command(version = "0.1.0")] -#[command(about = "An example of ICE", long_about = None)] +#[command(about = "An example of TURN Client UDP", long_about = None)] struct Cli { - #[arg(short, long)] - controlling: bool, + #[arg(long, default_value_t = format!("127.0.0.1"))] + host: String, + #[arg(long, default_value_t = 3478)] + port: u16, + #[arg(long)] + user: String, + #[arg(long, default_value_t = format!("webrtc.rs"))] + realm: String, + #[arg(long)] + ping: bool, #[arg(short, long)] debug: bool, @@ -20,187 +36,245 @@ struct Cli { } fn main() -> Result<()> { - env_logger::init(); - - let mut app = App::new("TURN Client UDP") - .version("0.1.0") - .author("Rain Liu ") - .about("An example of TURN Client UDP") - .setting(AppSettings::DeriveDisplayOrder) - .setting(AppSettings::SubcommandsNegateReqs) - .arg( - Arg::with_name("FULLHELP") - .help("Prints more detailed help information") - .long("fullhelp"), - ) - .arg( - Arg::with_name("host") - .required_unless("FULLHELP") - .takes_value(true) - .long("host") - .help("TURN Server name."), - ) - .arg( - Arg::with_name("user") - .required_unless("FULLHELP") - .takes_value(true) - .long("user") - .help("A pair of username and password (e.g. \"user=pass\")"), - ) - .arg( - Arg::with_name("realm") - .default_value("webrtc.rs") - .takes_value(true) - .long("realm") - .help("Realm (defaults to \"webrtc.rs\")"), - ) - .arg( - Arg::with_name("port") - .takes_value(true) - .default_value("3478") - .long("port") - .help("Listening port."), - ) - .arg( - Arg::with_name("ping") - .long("ping") - .takes_value(false) - .help("Run ping test"), - ); - - let matches = app.clone().get_matches(); - - if matches.is_present("FULLHELP") { - app.print_long_help().unwrap(); - std::process::exit(0); + let cli = Cli::parse(); + if cli.debug { + let log_level = log::LevelFilter::from_str(&cli.log_level).unwrap(); + env_logger::Builder::new() + .format(|buf, record| { + writeln!( + buf, + "{}:{} [{}] {} - {}", + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + record.level(), + chrono::Local::now().format("%H:%M:%S.%6f"), + record.args() + ) + }) + .filter(None, log_level) + .init(); } - let host = matches.value_of("host").unwrap(); - let port = matches.value_of("port").unwrap(); - let user = matches.value_of("user").unwrap(); + let host = cli.host; + let port = cli.port; + let user = cli.user; let cred: Vec<&str> = user.splitn(2, '=').collect(); - let ping = matches.is_present("ping"); - let realm = matches.value_of("realm").unwrap(); + let _ping = cli.ping; + let realm = cli.realm; // TURN client won't create a local listening socket by itself. - let conn = UdpSocket::bind("0.0.0.0:0").await?; + let socket = UdpSocket::bind("127.0.0.1:0")?; + let pinger = UdpSocket::bind("127.0.0.1:0")?; + + let local_addr = socket.local_addr()?; + let peer_addr = pinger.local_addr()?; + let mut pinger = Some(pinger); let turn_server_addr = format!("{host}:{port}"); let cfg = ClientConfig { stun_serv_addr: turn_server_addr.clone(), turn_serv_addr: turn_server_addr, + local_addr, + protocol: Protocol::UDP, username: cred[0].to_string(), password: cred[1].to_string(), realm: realm.to_string(), software: String::new(), rto_in_ms: 0, - conn: Arc::new(conn), - vnet: None, }; - let client = Client::new(cfg).await?; + let mut client = Client::new(cfg)?; + + // Allocate a relay socket on the TURN server. + let allocate_tid = client.allocate()?; + let mut relayed_addr = None; + let mut create_permission_tid = None; + // Send BindingRequest to learn our external IP + //let binding_tid = client.send_binding_request()?; + + let (stop_tx, stop_rx) = crossbeam_channel::bounded::<()>(1); + println!("Press Ctrl-C to stop"); + std::thread::spawn(move || { + let mut stop_tx = Some(stop_tx); + ctrlc::set_handler(move || { + if let Some(stop_tx) = stop_tx.take() { + let _ = stop_tx.send(()); + } + }) + .expect("Error setting Ctrl-C handler"); + }); + + let mut buf = vec![0u8; 2048]; + loop { + match stop_rx.try_recv() { + Ok(_) => break, + Err(err) => { + if err.is_disconnected() { + break; + } + } + }; + + while let Some(transmit) = client.poll_transmit() { + socket.send_to(&transmit.message, transmit.transport.peer_addr)?; + trace!( + "socket.sent {} to {}", + transmit.message.len(), + transmit.transport.peer_addr + ); + } + + while let Some(event) = client.poll_event() { + match event { + Event::TransactionTimeout(_) => return Err(Error::ErrTimeout), + Event::BindingResponse(_, reflexive_addr) => { + println!("reflexive address {}", reflexive_addr); + } + Event::BindingError(_, err) => return Err(err), + Event::AllocateResponse(tid, addr) => { + println!("relayed address {}", addr); + if relayed_addr.is_none() { + assert_eq!(tid, allocate_tid); + relayed_addr = Some(addr); + if let Some(id) = client.relay(addr)?.create_permission(peer_addr)? { + create_permission_tid = Some(id); + } else { + assert!(false, "create_permission failed"); + } + } else { + assert!(false, "relayed address is not none"); + } + } + Event::AllocateError(_, err) => return Err(err), + Event::CreatePermissionResponse(tid, peer_addr) => { + println!("CreatePermission for peer addr {} is granted", peer_addr); + if let Some(id) = create_permission_tid { + assert_eq!(tid, id); + + do_ping_test(pinger.take(), relayed_addr.clone()) + } else { + assert!(false, "create_permission_tid is none"); + } + } + Event::CreatePermissionError(_, err) => return Err(err), + Event::DataIndicationOrChannelData(_, from, data) => { + println!("relay read: {:?} from {}", &data[..], from); + + // Echo back + if let Some(&relay_addr) = relayed_addr.as_ref() { + client.relay(relay_addr)?.send_to(&data[..], from)?; + } + } + } + } - // Start listening on the conn provided. - client.listen().await?; + let mut eto = Instant::now() + Duration::from_millis(100); + if let Some(to) = client.poll_timout() { + if to < eto { + eto = to; + } + } - // Allocate a relay socket on the TURN server. On success, it - // will return a net.PacketConn which represents the remote - // socket. - let relay_conn = client.allocate().await?; + let delay_from_now = eto + .checked_duration_since(Instant::now()) + .unwrap_or(Duration::from_secs(0)); + if delay_from_now.is_zero() { + client.handle_timeout(Instant::now()); + continue; + } - // The relayConn's local address is actually the transport - // address assigned on the TURN server. - println!("relayed-address={}", relay_conn.local_addr()?); + socket + .set_read_timeout(Some(delay_from_now)) + .expect("setting socket read timeout"); + + if let Some(transmit) = read_socket_input(&socket, &mut buf) { + trace!( + "read_socket_input {} from {}", + transmit.message.len(), + transmit.transport.peer_addr + ); + client.handle_transmit(transmit)?; + } - // If you provided `-ping`, perform a ping test agaist the - // relayConn we have just allocated. - if ping { - do_ping_test(&client, relay_conn).await?; + // Drive time forward in all clients. + client.handle_timeout(Instant::now()); } - client.close().await?; + client.close(); Ok(()) } -async fn do_ping_test( - client: &Client, - relay_conn: impl Conn + std::marker::Send + std::marker::Sync + 'static, -) -> Result<(), Error> { - // Send BindingRequest to learn our external IP - let mapped_addr = client.send_binding_request().await?; +fn read_socket_input(socket: &UdpSocket, buf: &mut [u8]) -> Option> { + match socket.recv_from(buf) { + Ok((n, peer_addr)) => { + return Some(Transmit { + now: Instant::now(), + transport: TransportContext { + local_addr: socket.local_addr().unwrap(), + peer_addr, + protocol: Protocol::UDP, + ecn: None, + }, + message: BytesMut::from(&buf[..n]), + }); + } + + Err(e) => match e.kind() { + // Expected error for set_read_timeout(). One for windows, one for the rest. + ErrorKind::WouldBlock | ErrorKind::TimedOut => None, + _ => panic!("UdpSocket read failed: {e:?}"), + }, + } +} +fn do_ping_test(pinger: Option, relayed_addr: Option) { // Set up pinger socket (pingerConn) //println!("bind..."); - let pinger_conn_tx = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); // Punch a UDP hole for the relay_conn by sending a data to the mapped_addr. // This will trigger a TURN client to generate a permission request to the // TURN server. After this, packets from the IP address will be accepted by // the TURN server. //println!("relay_conn send hello to mapped_addr {}", mapped_addr); - relay_conn.send_to("Hello".as_bytes(), mapped_addr).await?; - let relay_addr = relay_conn.local_addr()?; - - let pinger_conn_rx = Arc::clone(&pinger_conn_tx); - - // Start read-loop on pingerConn - tokio::spawn(async move { - let mut buf = vec![0u8; 1500]; - loop { - let (n, from) = match pinger_conn_rx.recv_from(&mut buf).await { - Ok((n, from)) => (n, from), - Err(_) => break, - }; - - let msg = match String::from_utf8(buf[..n].to_vec()) { - Ok(msg) => msg, - Err(_) => break, - }; - - println!("pingerConn read-loop: {msg} from {from}"); - /*if sentAt, pingerErr := time.Parse(time.RFC3339Nano, msg); pingerErr == nil { - rtt := time.Since(sentAt) - log.Printf("%d bytes from from %s time=%d ms\n", n, from.String(), int(rtt.Seconds()*1000)) - }*/ - } - }); - - // Start read-loop on relay_conn - tokio::spawn(async move { - let mut buf = vec![0u8; 1500]; - loop { - let (n, from) = match relay_conn.recv_from(&mut buf).await { - Err(_) => break, - Ok((n, from)) => (n, from), - }; - - println!("relay_conn read-loop: {:?} from {}", &buf[..n], from); - - // Echo back - if relay_conn.send_to(&buf[..n], from).await.is_err() { - break; + /*client + .relay(relayed_addr) + .send_to("Hello".as_bytes(), reflexive_addr)?; + */ + if let (Some(pinger), Some(relayed_addr)) = (pinger, relayed_addr) { + // Start read-loop on pingerConn + thread::spawn(move || { + let mut buf = vec![0u8; 1500]; + + for i in 0..10 { + let msg = "12345678910".to_owned(); //format!("{:?}", tokio::time::Instant::now()); + println!( + "sending {}th msg={} with size={} to {}", + i, + msg, + msg.as_bytes().len(), + relayed_addr + ); + pinger.send_to(msg.as_bytes(), relayed_addr).unwrap(); + + let (n, from) = match pinger.recv_from(&mut buf) { + Ok((n, from)) => (n, from), + Err(_) => break, + }; + + let msg = match String::from_utf8(buf[..n].to_vec()) { + Ok(msg) => msg, + Err(_) => break, + }; + + println!("pinger read-loop: {msg} from {from}"); + + // For simplicity, this example does not wait for the pong (reply). + // Instead, sleep 1 second. + thread::sleep(Duration::from_secs(1)); } - } - }); - - tokio::time::sleep(Duration::from_millis(500)).await; - - /*println!( - "pinger_conn_tx send 10 packets to relay addr {}...", - relay_addr - );*/ - // Send 10 packets from relay_conn to the echo server - for _ in 0..2 { - let msg = "12345678910".to_owned(); //format!("{:?}", tokio::time::Instant::now()); - println!("sending msg={} with size={}", msg, msg.as_bytes().len()); - pinger_conn_tx.send_to(msg.as_bytes(), relay_addr).await?; - - // For simplicity, this example does not wait for the pong (reply). - // Instead, sleep 1 second. - tokio::time::sleep(Duration::from_secs(1)).await; + println!("ping completed"); + }); } - - Ok(()) } diff --git a/rtc-turn/src/client/mod.rs b/rtc-turn/src/client/mod.rs index bc576a5..53c1eeb 100644 --- a/rtc-turn/src/client/mod.rs +++ b/rtc-turn/src/client/mod.rs @@ -44,6 +44,7 @@ pub type RelayedAddr = SocketAddr; pub type ReflexiveAddr = SocketAddr; pub type PeerAddr = SocketAddr; +#[derive(Debug)] pub enum Event { TransactionTimeout(TransactionId), @@ -53,7 +54,7 @@ pub enum Event { AllocateResponse(TransactionId, RelayedAddr), AllocateError(TransactionId, Error), - CreatePermissionResponse(TransactionId), + CreatePermissionResponse(TransactionId, PeerAddr), CreatePermissionError(TransactionId, Error), DataIndicationOrChannelData(Option, PeerAddr, BytesMut), @@ -507,11 +508,12 @@ impl Client { ])?; debug!("client.Allocate call PerformTransaction 1"); - let tid = self.perform_transaction( + let mut tid = self.perform_transaction( &msg, self.turn_server_addr()?, TransactionType::AllocateAttempt, ); + tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1); Ok(tid) } @@ -547,9 +549,15 @@ impl Client { ); let mut msg = Message::new(); + + // make it same as allocate() return value so that client can retrieve it + // from Event::AllocateResponse + let mut tid = response.transaction_id; + tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1); + // Trying to authorize. msg.build(&[ - Box::new(TransactionId::new()), + Box::new(tid), Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)), Box::new(RequestedTransport { protocol: if self.protocol == Protocol::UDP { diff --git a/rtc-turn/src/client/relay.rs b/rtc-turn/src/client/relay.rs index 33a2b39..eed3468 100644 --- a/rtc-turn/src/client/relay.rs +++ b/rtc-turn/src/client/relay.rs @@ -20,6 +20,9 @@ use crate::client::{Client, Event, RelayedAddr}; use shared::error::{Error, Result}; const PERM_REFRESH_INTERVAL: Duration = Duration::from_secs(120); +// https://datatracker.ietf.org/doc/html/rfc8656#name-permissions-2 +// The Permission Lifetime MUST be 300 seconds (= 5 minutes). +const PERM_LIFETIME: Duration = Duration::from_secs(300); const MAX_RETRY_ATTEMPTS: u16 = 3; // RelayState is a set of params use by Relay @@ -214,7 +217,7 @@ impl<'a> Relay<'a> { && Instant::now() .checked_duration_since(bind_at) .unwrap_or_else(|| Duration::from_secs(0)) - > Duration::from_secs(5 * 60) + > PERM_LIFETIME { if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) { b.set_state(BindingState::Refresh); @@ -304,7 +307,10 @@ impl<'a> Relay<'a> { perm.set_state(PermState::Permitted); self.client .events - .push_back(Event::CreatePermissionResponse(res.transaction_id)); + .push_back(Event::CreatePermissionResponse( + res.transaction_id, + peer_addr, + )); } }