diff --git a/rtc-ice/Cargo.toml b/rtc-ice/Cargo.toml index 2d9e152..ef6c676 100644 --- a/rtc-ice/Cargo.toml +++ b/rtc-ice/Cargo.toml @@ -25,14 +25,17 @@ regex = "1.10.3" env_logger = "0.11.3" chrono = "0.4.35" ipnet = "2.9.0" -clap = "4.5.2" +clap = { version = "4.5.2", features = ["derive"] } lazy_static = "1.4.0" hyper = { version = "0.14.28", features = ["full"] } sha1 = "0.10.6" waitgroup = "0.1.2" serde_json = "1.0.114" +tokio = { version = "1.36", features = ["full"] } +futures = "0.3.30" +ctrlc = "3.4" -#[[example]] -#name = "ping_pong" -#path = "examples/ping_pong.rs" -#bench = false +[[example]] +name = "ping_pong" +path = "examples/ping_pong.rs" +bench = false diff --git a/rtc-ice/examples/ping_pong.rs b/rtc-ice/examples/ping_pong.rs index 9e3692c..72842f1 100644 --- a/rtc-ice/examples/ping_pong.rs +++ b/rtc-ice/examples/ping_pong.rs @@ -1,22 +1,23 @@ -/*TODO: use std::io; -use std::sync::Arc; -use std::time::Duration; - -use clap::{App, AppSettings, Arg}; +use bytes::BytesMut; +use clap::Parser; +use futures::StreamExt; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Client, Method, Request, Response, Server, StatusCode}; -use ice::agent::agent_config::AgentConfig; -use ice::agent::Agent; -use ice::candidate::candidate_base::*; -use ice::candidate::*; -use ice::network_type::*; -use ice::state::*; -use ice::udp_network::UDPNetwork; -use ice::Error; -use rand::{thread_rng, Rng}; +use rtc_ice::agent::agent_config::AgentConfig; +use rtc_ice::agent::Agent; +use rtc_ice::candidate::candidate_host::CandidateHostConfig; +use rtc_ice::candidate::*; +use rtc_ice::state::*; +use rtc_ice::{Credentials, Event}; +use shared::error::Error; +use shared::{Protocol, Transmit, TransportContext}; +use std::io; +use std::io::Write; +use std::str::FromStr; +use std::sync::Arc; +use std::time::{Duration, Instant}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, watch, Mutex}; -use util::Conn; #[macro_use] extern crate lazy_static; @@ -85,341 +86,276 @@ async fn remote_handler(req: Request) -> Result, hyper::Err // Controlling Agent: // cargo run --color=always --package webrtc-ice --example ping_pong -- --controlling +#[derive(Parser)] +#[command(name = "ICE Ping Pong")] +#[command(author = "Rusty Rain ")] +#[command(version = "0.1.0")] +#[command(about = "An example of ICE", long_about = None)] +struct Cli { + #[arg(short, long)] + controlling: bool, + + #[arg(short, long)] + debug: bool, + #[arg(long, default_value_t = format!("INFO"))] + log_level: String, +} + #[tokio::main] async fn main() -> Result<(), Error> { - env_logger::init(); - // .format(|buf, record| { - // writeln!( - // buf, - // "{}:{} [{}] {} - {}", - // record.file().unwrap_or("unknown"), - // record.line().unwrap_or(0), - // record.level(), - // chrono::Local::now().format("%H:%M:%S.%6f"), - // record.args() - // ) - // }) - // .filter(None, log::LevelFilter::Trace) - // .init(); - - let mut app = App::new("ICE Demo") - .version("0.1.0") - .author("Rain Liu ") - .about("An example of ICE") - .setting(AppSettings::DeriveDisplayOrder) - .setting(AppSettings::SubcommandsNegateReqs) - .arg( - Arg::with_name("use-mux") - .takes_value(false) - .long("use-mux") - .short('m') - .help("Use a muxed UDP connection over a single listening port"), - ) - .arg( - Arg::with_name("FULLHELP") - .help("Prints more detailed help information") - .long("fullhelp"), - ) - .arg( - Arg::with_name("controlling") - .takes_value(false) - .long("controlling") - .help("is ICE Agent controlling"), - ); - - let matches = app.clone().get_matches(); - - if matches.is_present("FULLHELP") { - app.print_long_help().unwrap(); - std::process::exit(0); + let cli = Cli::parse(); + if cli.debug { + let log_level = log::LevelFilter::from_str(&cli.log_level).unwrap(); + env_logger::Builder::new() + .format(|buf, record| { + writeln!( + buf, + "{}:{} [{}] {} - {}", + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + record.level(), + chrono::Local::now().format("%H:%M:%S.%6f"), + record.args() + ) + }) + .filter(None, log_level) + .init(); } - let is_controlling = matches.is_present("controlling"); - let use_mux = matches.is_present("use-mux"); - - let (local_http_port, remote_http_port) = if is_controlling { + let (local_http_port, remote_http_port) = if cli.controlling { (9000, 9001) } else { (9001, 9000) }; - let (weak_conn, weak_agent) = { - let (done_tx, done_rx) = watch::channel(()); - - println!("Listening on http://localhost:{local_http_port}"); - let mut done_http_server = done_rx.clone(); - tokio::spawn(async move { - let addr = ([0, 0, 0, 0], local_http_port).into(); - let service = - make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(remote_handler)) }); - let server = Server::bind(&addr).serve(service); - tokio::select! { - _ = done_http_server.changed() => { - println!("receive cancel http server!"); + let (done_tx, mut done_rx) = watch::channel(()); + + println!("Listening on http://localhost:{local_http_port}"); + let mut done_http_server = done_rx.clone(); + tokio::spawn(async move { + let addr = ([0, 0, 0, 0], local_http_port).into(); + let service = + make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(remote_handler)) }); + let server = Server::bind(&addr).serve(service); + tokio::select! { + _ = done_http_server.changed() => { + println!("receive cancel http server!"); + } + result = server => { + // Run this server for... forever! + if let Err(e) = result { + eprintln!("server error: {e}"); + } + println!("exit http server!"); + } + }; + }); + + if cli.controlling { + println!("Local Agent is controlling"); + } else { + println!("Local Agent is controlled"); + }; + println!("Press 'Enter' when both processes have started"); + let mut input = String::new(); + let _ = io::stdin().read_line(&mut input)?; + + let port = if cli.controlling { 4000 } else { 4001 }; + let udp_socket = UdpSocket::bind(("0.0.0.0", port)).await?; + let mut ice_agent = Agent::new(AgentConfig::default())?; + + let client = Arc::new(Client::new()); + + // When we have gathered a new ICE Candidate send it to the remote peer + let client2 = Arc::clone(&client); + let on_candidate = |c: Candidate| { + let client3 = Arc::clone(&client2); + Box::pin(async move { + println!("posting remoteCandidate with {}", c.marshal()); + + let req = match Request::builder() + .method(Method::POST) + .uri(format!( + "http://localhost:{remote_http_port}/remoteCandidate" + )) + .body(Body::from(c.marshal())) + { + Ok(req) => req, + Err(err) => { + println!("{err}"); + return; } - result = server => { - // Run this server for... forever! - if let Err(e) = result { - eprintln!("server error: {e}"); - } - println!("exit http server!"); + }; + let resp = match client3.request(req).await { + Ok(resp) => resp, + Err(err) => { + println!("{err}"); + return; } }; - }); + println!("Response from remoteCandidate: {}", resp.status()); + }) + }; - if is_controlling { - println!("Local Agent is controlling"); + // Get the local auth details and send to remote peer + let Credentials { + ufrag: local_ufrag, + pwd: local_pwd, + } = ice_agent.get_local_credentials(); + + println!("posting remoteAuth with {local_ufrag}:{local_pwd}"); + let req = match Request::builder() + .method(Method::POST) + .uri(format!("http://localhost:{remote_http_port}/remoteAuth")) + .body(Body::from(format!("{local_ufrag}:{local_pwd}"))) + { + Ok(req) => req, + Err(err) => return Err(Error::Other(format!("{err}"))), + }; + let resp = match client.request(req).await { + Ok(resp) => resp, + Err(err) => return Err(Error::Other(format!("{err}"))), + }; + println!("Response from remoteAuth: {}", resp.status()); + + let (remote_ufrag, remote_pwd) = { + let mut rx = REMOTE_AUTH_CHANNEL.1.lock().await; + if let Some(s) = rx.recv().await { + println!("received: {s}"); + let fields: Vec = s.split(':').map(|s| s.to_string()).collect(); + (fields[0].clone(), fields[1].clone()) } else { - println!("Local Agent is controlled"); - }; - println!("Press 'Enter' when both processes have started"); - let mut input = String::new(); - let _ = io::stdin().read_line(&mut input)?; - - let udp_network = if use_mux { - use ice::udp_mux::*; - let port = if is_controlling { 4000 } else { 4001 }; - - let udp_socket = UdpSocket::bind(("0.0.0.0", port)).await?; - let udp_mux = UDPMuxDefault::new(UDPMuxParams::new(udp_socket)); - - UDPNetwork::Muxed(udp_mux) + panic!("rx.recv() empty"); + } + }; + println!("remote_ufrag: {remote_ufrag}, remote_pwd: {remote_pwd}"); + + // gather_candidates + println!("gathering candidates..."); + let local_candidate = CandidateHostConfig { + base_config: CandidateConfig { + network: "udp".to_owned(), + address: udp_socket.local_addr()?.ip().to_string(), + port: udp_socket.local_addr()?.port(), + component: 1, + ..Default::default() + }, + ..Default::default() + } + .new_candidate_host()?; + on_candidate(local_candidate.clone()).await; + ice_agent.add_local_candidate(local_candidate)?; + + let remote_candidate = { + let mut rx = REMOTE_CAND_CHANNEL.1.lock().await; + if let Some(s) = rx.recv().await { + println!("received remote_candidate: {s}"); + unmarshal_candidate(&s)? } else { - UDPNetwork::Ephemeral(Default::default()) - }; - - let ice_agent = Arc::new( - Agent::new(AgentConfig { - network_types: vec![NetworkType::Udp4], - udp_network, - ..Default::default() - }) - .await?, - ); - - let client = Arc::new(Client::new()); - - // When we have gathered a new ICE Candidate send it to the remote peer - let client2 = Arc::clone(&client); - ice_agent.on_candidate(Box::new( - move |c: Option>| { - let client3 = Arc::clone(&client2); - Box::pin(async move { - if let Some(c) = c { - println!("posting remoteCandidate with {}", c.marshal()); - - let req = match Request::builder() - .method(Method::POST) - .uri(format!( - "http://localhost:{remote_http_port}/remoteCandidate" - )) - .body(Body::from(c.marshal())) - { - Ok(req) => req, - Err(err) => { - println!("{err}"); - return; - } - }; - let resp = match client3.request(req).await { - Ok(resp) => resp, - Err(err) => { - println!("{err}"); - return; - } - }; - println!("Response from remoteCandidate: {}", resp.status()); + panic!("rx.recv() empty"); + } + }; + let peer_addr = remote_candidate.addr(); + ice_agent.add_remote_candidate(remote_candidate)?; + + ice_agent.start_connectivity_checks(cli.controlling, remote_ufrag, remote_pwd)?; + + println!("Enter bye to stop"); + let (mut tx, mut rx) = futures::channel::mpsc::channel(8); + std::thread::spawn(move || { + let mut buffer = String::new(); + while io::stdin().read_line(&mut buffer).is_ok() { + match buffer.trim_end() { + "" => break, + line => { + if line == "bye" { + let _ = done_tx.send(()); + break; } - }) - }, - )); - - let (ice_done_tx, mut ice_done_rx) = mpsc::channel::<()>(1); - // When ICE Connection state has change print to stdout - ice_agent.on_connection_state_change(Box::new(move |c: ConnectionState| { - println!("ICE Connection State has changed: {c}"); - if c == ConnectionState::Failed { - let _ = ice_done_tx.try_send(()); - } - Box::pin(async move {}) - })); - - // Get the local auth details and send to remote peer - let (local_ufrag, local_pwd) = ice_agent.get_local_user_credentials().await; - - println!("posting remoteAuth with {local_ufrag}:{local_pwd}"); - let req = match Request::builder() - .method(Method::POST) - .uri(format!("http://localhost:{remote_http_port}/remoteAuth")) - .body(Body::from(format!("{local_ufrag}:{local_pwd}"))) - { - Ok(req) => req, - Err(err) => return Err(Error::Other(format!("{err}"))), - }; - let resp = match client.request(req).await { - Ok(resp) => resp, - Err(err) => return Err(Error::Other(format!("{err}"))), - }; - println!("Response from remoteAuth: {}", resp.status()); - - let (remote_ufrag, remote_pwd) = { - let mut rx = REMOTE_AUTH_CHANNEL.1.lock().await; - if let Some(s) = rx.recv().await { - println!("received: {s}"); - let fields: Vec = s.split(':').map(|s| s.to_string()).collect(); - (fields[0].clone(), fields[1].clone()) - } else { - panic!("rx.recv() empty"); - } - }; - println!("remote_ufrag: {remote_ufrag}, remote_pwd: {remote_pwd}"); - - let ice_agent2 = Arc::clone(&ice_agent); - let mut done_cand = done_rx.clone(); - tokio::spawn(async move { - let mut rx = REMOTE_CAND_CHANNEL.1.lock().await; - loop { - tokio::select! { - _ = done_cand.changed() => { - println!("receive cancel remote cand!"); + if tx.try_send(line.to_string()).is_err() { break; } - result = rx.recv() => { - if let Some(s) = result { - if let Ok(c) = unmarshal_candidate(&s) { - println!("add_remote_candidate: {c}"); - let c: Arc = Arc::new(c); - let _ = ice_agent2.add_remote_candidate(&c); - }else{ - println!("unmarshal_candidate error!"); - break; - } - }else{ - println!("REMOTE_CAND_CHANNEL done!"); + } + }; + buffer.clear(); + } + }); + + // Start the ICE Agent. One side must be controlled, and the other must be controlling + let mut buf = vec![0u8; 2048]; + loop { + while let Some(transmit) = ice_agent.poll_transmit() { + udp_socket + .send_to(&transmit.message[..], transmit.transport.peer_addr) + .await?; + } + let mut is_failed = false; + while let Some(event) = ice_agent.poll_event() { + match event { + Event::ConnectionStateChange(cs) => { + println!("ConnectionStateChange with {}", cs); + match cs { + ConnectionState::Failed => { + is_failed = true; break; } + _ => {} } - }; + } + _ => {} } - }); - - ice_agent.gather_candidates()?; - println!("Connecting..."); + } + if is_failed { + break; + } - let (_cancel_tx, cancel_rx) = mpsc::channel(1); - // Start the ICE Agent. One side must be controlled, and the other must be controlling - let conn: Arc = if is_controlling { - ice_agent.dial(cancel_rx, remote_ufrag, remote_pwd).await? + let d = if let Some(eto) = ice_agent.poll_timeout() { + eto.duration_since(Instant::now()) } else { - ice_agent - .accept(cancel_rx, remote_ufrag, remote_pwd) - .await? + Duration::from_millis(100) }; + let timeout = tokio::time::sleep(d); + tokio::pin!(timeout); - let weak_conn = Arc::downgrade(&conn); - - // Send messages in a loop to the remote peer - let conn_tx = Arc::clone(&conn); - let mut done_send = done_rx.clone(); - tokio::spawn(async move { - const RANDOM_STRING: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; - loop { - tokio::time::sleep(Duration::from_secs(3)).await; - - let val: String = (0..15) - .map(|_| { - let idx = thread_rng().gen_range(0..RANDOM_STRING.len()); - RANDOM_STRING[idx] as char - }) - .collect(); - - tokio::select! { - _ = done_send.changed() => { - println!("receive cancel ice send!"); - break; - } - result = conn_tx.send(val.as_bytes()) => { - if let Err(err) = result { - eprintln!("conn_tx send error: {err}"); - break; - }else{ - println!("Sent: '{val}'"); - } - } - }; + tokio::select! { + _ = done_rx.changed() => { + println!("exit ICE loop"); + break; + } + _ = timeout.as_mut() => { + ice_agent.handle_timeout(Instant::now()); } - }); - - let mut done_recv = done_rx.clone(); - tokio::spawn(async move { - // Receive messages in a loop from the remote peer - let mut buf = vec![0u8; 1500]; - loop { - tokio::select! { - _ = done_recv.changed() => { - println!("receive cancel ice recv!"); + res = udp_socket.recv_from(&mut buf) => { + if let Ok((n, remote_addr)) = res { + if n == 0 { break; } - result = conn.recv(&mut buf) => { - match result { - Ok(n) => { - println!("Received: '{}'", std::str::from_utf8(&buf[..n]).unwrap()); - } - Err(err) => { - eprintln!("conn_tx send error: {err}"); - break; - } - }; - } - }; - } - }); - - println!("Press ctrl-c to stop"); - /*let d = if is_controlling { - Duration::from_secs(500) - } else { - Duration::from_secs(5) - }; - let timeout = tokio::time::sleep(d); - tokio::pin!(timeout);*/ - tokio::select! { - /*_ = timeout.as_mut() => { - println!("received timeout signal!"); - let _ = done_tx.send(()); - }*/ - _ = ice_done_rx.recv() => { - println!("ice_done_rx"); - let _ = done_tx.send(()); + if stun::message::is_message(&buf[0..n]) { + ice_agent.handle_read(Transmit::{ + now: Instant::now(), + transport: TransportContext{ + local_addr: udp_socket.local_addr()?, + peer_addr: remote_addr, + ecn: None, + protocol: Protocol::UDP, + }, + message: BytesMut::from(&buf[0..n]), + })?; + } else { + println!("{}", String::from_utf8((&buf[0..n]).to_vec())?); + } + } } - _ = tokio::signal::ctrl_c() => { - println!(); - let _ = done_tx.send(()); + res = rx.next() => { + if let Some(line) = res { + udp_socket.send_to(line.as_bytes(), peer_addr).await?; + } } }; - - let _ = ice_agent.close().await; - - (weak_conn, Arc::downgrade(&ice_agent)) - }; - - let mut int = tokio::time::interval(Duration::from_secs(1)); - loop { - int.tick().await; - println!( - "weak_conn: weak count = {}, strong count = {}, weak_agent: weak count = {}, strong count = {}", - weak_conn.weak_count(), - weak_conn.strong_count(), - weak_agent.weak_count(), - weak_agent.strong_count(), - ); - if weak_conn.strong_count() == 0 && weak_agent.strong_count() == 0 { - break; - } } + ice_agent.close()?; + Ok(()) } -*/ -fn main() {} diff --git a/rtc-ice/src/agent/agent_config.rs b/rtc-ice/src/agent/agent_config.rs index f26ac3e..be21085 100644 --- a/rtc-ice/src/agent/agent_config.rs +++ b/rtc-ice/src/agent/agent_config.rs @@ -92,6 +92,15 @@ pub struct AgentConfig { /// Specify a minimum wait time before selecting host candidates. pub host_acceptance_min_wait: Option, + /// Specify a minimum wait time before selecting srfl candidates. + pub srflx_acceptance_min_wait: Option, + + /// Specify a minimum wait time before selecting prfl candidates. + pub prflx_acceptance_min_wait: Option, + + /// Specify a minimum wait time before selecting relay candidates. + pub relay_acceptance_min_wait: Option, + /// Controls if self-signed certificates are accepted when connecting to TURN servers via TLS or /// DTLS. pub insecure_skip_verify: bool, diff --git a/rtc-ice/src/agent/agent_selector.rs b/rtc-ice/src/agent/agent_selector.rs index 59412cf..e1e4e2b 100644 --- a/rtc-ice/src/agent/agent_selector.rs +++ b/rtc-ice/src/agent/agent_selector.rs @@ -55,8 +55,29 @@ impl Agent { .as_nanos() > self.host_acceptance_min_wait.as_nanos() } - _ => { - error!( + CandidateType::ServerReflexive => { + Instant::now() + .checked_duration_since(start_time) + .unwrap_or_else(|| Duration::from_secs(0)) + .as_nanos() + > self.srflx_acceptance_min_wait.as_nanos() + } + CandidateType::PeerReflexive => { + Instant::now() + .checked_duration_since(start_time) + .unwrap_or_else(|| Duration::from_secs(0)) + .as_nanos() + > self.prflx_acceptance_min_wait.as_nanos() + } + CandidateType::Relay => { + Instant::now() + .checked_duration_since(start_time) + .unwrap_or_else(|| Duration::from_secs(0)) + .as_nanos() + > self.relay_acceptance_min_wait.as_nanos() + } + CandidateType::Unspecified => { + log::error!( "is_nominatable invalid candidate type {}", c.candidate_type() ); diff --git a/rtc-ice/src/agent/mod.rs b/rtc-ice/src/agent/mod.rs index 96a9c7f..83adf13 100644 --- a/rtc-ice/src/agent/mod.rs +++ b/rtc-ice/src/agent/mod.rs @@ -113,6 +113,9 @@ pub struct Agent { pub(crate) insecure_skip_verify: bool, pub(crate) max_binding_requests: u16, pub(crate) host_acceptance_min_wait: Duration, + pub(crate) srflx_acceptance_min_wait: Duration, + pub(crate) prflx_acceptance_min_wait: Duration, + pub(crate) relay_acceptance_min_wait: Duration, // How long connectivity checks can fail before the ICE Agent // goes to disconnected pub(crate) disconnected_timeout: Duration, @@ -184,6 +187,27 @@ impl Agent { } else { DEFAULT_HOST_ACCEPTANCE_MIN_WAIT }, + srflx_acceptance_min_wait: if let Some(srflx_acceptance_min_wait) = + config.srflx_acceptance_min_wait + { + srflx_acceptance_min_wait + } else { + DEFAULT_SRFLX_ACCEPTANCE_MIN_WAIT + }, + prflx_acceptance_min_wait: if let Some(prflx_acceptance_min_wait) = + config.prflx_acceptance_min_wait + { + prflx_acceptance_min_wait + } else { + DEFAULT_PRFLX_ACCEPTANCE_MIN_WAIT + }, + relay_acceptance_min_wait: if let Some(relay_acceptance_min_wait) = + config.relay_acceptance_min_wait + { + relay_acceptance_min_wait + } else { + DEFAULT_RELAY_ACCEPTANCE_MIN_WAIT + }, // How long connectivity checks can fail before the ICE Agent // goes to disconnected @@ -416,6 +440,27 @@ impl Agent { } } + /// start connectivity checks + pub fn start_connectivity_checks( + &mut self, + is_controlling: bool, + remote_ufrag: String, + remote_pwd: String, + ) -> Result<()> { + debug!( + "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}", + is_controlling, remote_ufrag, remote_pwd + ); + self.set_remote_credentials(remote_ufrag, remote_pwd)?; + self.is_controlling = is_controlling; + self.start(); + + self.update_connection_state(ConnectionState::Checking); + self.request_connectivity_check(); + + Ok(()) + } + /// Restarts the ICE Agent with the provided ufrag/pwd /// If no ufrag/pwd is provided the Agent will generate one itself. pub fn restart( @@ -465,26 +510,6 @@ impl Agent { &self.local_candidates } - pub(crate) fn start_connectivity_checks( - &mut self, - is_controlling: bool, - remote_ufrag: String, - remote_pwd: String, - ) -> Result<()> { - debug!( - "Started agent: isControlling? {}, remoteUfrag: {}, remotePwd: {}", - is_controlling, remote_ufrag, remote_pwd - ); - self.set_remote_credentials(remote_ufrag, remote_pwd)?; - self.is_controlling = is_controlling; - self.start(); - - self.update_connection_state(ConnectionState::Checking); - self.request_connectivity_check(); - - Ok(()) - } - fn contact(&mut self, now: Instant) { if self.connection_state == ConnectionState::Failed { // The connection is currently failed so don't send any checks