Skip to content

Commit

Permalink
Allow connections to be closed from the public API (#84)
Browse files Browse the repository at this point in the history
* Allow connections to be closed from the public API

* Remove debugging trace

* Review comments and clippy
  • Loading branch information
ThetaSinner authored Mar 26, 2024
1 parent af0ecea commit d36e4bb
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 20 deletions.
4 changes: 3 additions & 1 deletion crates/tx5-core/src/evt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::{Error, Result};
use std::sync::Arc;

/// Permit for sending on the channel.
pub struct EventPermit(Option<tokio::sync::OwnedSemaphorePermit>);
pub struct EventPermit(
#[allow(dead_code)] Option<tokio::sync::OwnedSemaphorePermit>,
);

impl std::fmt::Debug for EventPermit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
2 changes: 1 addition & 1 deletion crates/tx5-go-pion/src/evt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tx5_go_pion_sys::Event as SysEvent;
use tx5_go_pion_sys::API;

/// PeerConnectionState events.
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum PeerConnectionState {
/// <https://pkg.go.dev/github.com/pion/webrtc/v3#PeerConnectionState>
New = 0x01,
Expand Down
4 changes: 2 additions & 2 deletions crates/tx5-go-pion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,9 @@ mod tests {
println!("close data 2");
data2.close(Error::id("").into());
println!("close peer 1");
peer1.close(Error::id("").into());
peer1.close(Error::id(""));
println!("close peer 2");
peer2.close(Error::id("").into());
peer2.close(Error::id(""));

println!("close turn");
turn.stop().await.unwrap();
Expand Down
25 changes: 24 additions & 1 deletion crates/tx5-go-pion/src/peer_con.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl From<&AnswerConfig> for GoBufRef<'static> {

pub(crate) struct PeerConCore {
peer_con_id: usize,
con_state: PeerConnectionState,
evt_send: tokio::sync::mpsc::UnboundedSender<PeerConnectionEvent>,
drop_err: Error,
}
Expand All @@ -118,6 +119,7 @@ impl PeerConCore {
) -> Self {
Self {
peer_con_id,
con_state: PeerConnectionState::New,
evt_send,
drop_err: Error::id("PeerConnectionDropped").into(),
}
Expand Down Expand Up @@ -200,8 +202,29 @@ impl PeerConnection {
.await?
}

/// Set the connection state. This should only be set based on connection state events
/// coming from the underlying webrtc library.
pub fn set_con_state(&self, con_state: PeerConnectionState) {
let mut lock = self.0.lock().unwrap();
if let Ok(core) = &mut *lock {
core.con_state = con_state;
} else {
tracing::warn!(
?con_state,
"Unable to set peer connection state: {:?}",
self.get_peer_con_id()
);
}
}

/// Get the connection state.
pub fn get_con_state(&self) -> Result<PeerConnectionState> {
peer_con_strong_core!(self.0, core, { Ok(core.con_state) })
}

/// Close this connection.
pub fn close(&self, err: Error) {
pub fn close<E: Into<Error>>(&self, err: E) {
let err = err.into();
let mut tmp = Err(err.clone());

{
Expand Down
4 changes: 2 additions & 2 deletions crates/tx5/src/back_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ impl std::io::Read for BackBuf {
/// Conversion type facilitating Into<&mut BackBuf>.
pub(crate) enum BackBufRef<'lt> {
/// An owned BackBuf.
Owned(Result<BackBuf>),
Owned(#[allow(dead_code)] Result<BackBuf>),

/// A borrowed BackBuf.
Borrowed(Result<&'lt mut BackBuf>),
Borrowed(#[allow(dead_code)] Result<&'lt mut BackBuf>),
}

impl From<BackBuf> for BackBufRef<'static> {
Expand Down
36 changes: 35 additions & 1 deletion crates/tx5/src/ep3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,47 @@ impl Ep3 {
if let Ok(sig) = fut.await {
// see if we are still banning this id.
if ep.ban_map.lock().unwrap().is_banned(rem_id) {
sig.ban(rem_id);
sig.close(rem_id);
}
}
});
}
}

/// Request that the peer connection identified by the given `peer_url` is closed.
pub fn close(&self, peer_url: PeerUrl) -> Result<()> {
if !peer_url.is_client() {
return Err(Error::str("Expected PeerUrl, got SigUrl"));
}

let peer_id = peer_url.id().unwrap();

let sig_url = peer_url.to_server();
match self._sig_map.lock().unwrap().get(&sig_url) {
Some((_, fut)) => {
let fut = fut.clone();
tokio::task::spawn(async move {
match fut.await {
Ok(sig) => sig.close(peer_id),
Err(e) => {
tracing::debug!(
?e,
"Unable to close peer connection",
);
}
}
});
}
None => {
return Err(Error::str(
"No connections held for this signal server",
));
}
}

Ok(())
}

/// Send data to a remote on this tx5 endpoint.
/// The future returned from this method will resolve when
/// the data is handed off to our networking backend.
Expand Down
11 changes: 9 additions & 2 deletions crates/tx5/src/ep3/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub(crate) struct Peer {
cmd_task: tokio::task::JoinHandle<()>,
recv_task: tokio::task::JoinHandle<()>,
data_task: tokio::task::JoinHandle<()>,
#[allow(dead_code)]
peer: Arc<tx5_go_pion::PeerConnection>,
data_chan: Arc<tx5_go_pion::DataChannel>,
send_limit: Arc<tokio::sync::Semaphore>,
Expand All @@ -70,6 +69,7 @@ pub(crate) struct Peer {

impl Drop for Peer {
fn drop(&mut self) {
self.peer.close("Close");
let evt_send = self.sig.evt_send.clone();
let msg = Ep3Event::Disconnected {
peer_url: self.peer_url.clone(),
Expand Down Expand Up @@ -355,6 +355,7 @@ impl Peer {
};

let recv_task = {
let weak_peer = Arc::downgrade(&peer);
let sig = sig.clone();
tokio::task::spawn(async move {
while let Some(evt) = peer_recv.recv().await {
Expand All @@ -364,7 +365,13 @@ impl Peer {
tracing::warn!(?err);
break;
}
Evt::State(_state) => (),
Evt::State(state) => {
if let Some(peer) = weak_peer.upgrade() {
peer.set_con_state(state);
} else {
break;
}
}
Evt::ICECandidate(mut ice) => {
let ice = match ice.as_json() {
Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/tx5/src/ep3/sig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl Sig {
}
}

pub fn ban(&self, id: Id) {
pub fn close(&self, id: Id) {
let r = self.peer_map.lock().unwrap().get(&id).cloned();
if let Some((uniq, _, _, _)) = r {
close_peer(&self.sig.weak_peer_map, id, uniq);
Expand Down
33 changes: 33 additions & 0 deletions crates/tx5/src/ep3/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,39 @@ async fn ep3_preflight_happy() {
assert_eq!(true, did_valid.load(std::sync::atomic::Ordering::SeqCst));
}

#[tokio::test(flavor = "multi_thread")]
async fn ep3_close_connection() {
let config = Arc::new(Config3::default());
let test = Test::new().await;

let (_cli_url1, ep1, _ep1_recv) = test.ep(config.clone()).await;
let (cli_url2, _ep2, mut ep2_recv) = test.ep(config).await;

ep1.send(cli_url2.clone(), b"hello").await.unwrap();

let res = ep2_recv.recv().await.unwrap();
match res {
Ep3Event::Connected { .. } => (),
_ => panic!(),
}

let res = ep2_recv.recv().await.unwrap();
match res {
Ep3Event::Message { message, .. } => {
assert_eq!(&b"hello"[..], &message);
}
_ => panic!(),
}

ep1.close(cli_url2).unwrap();

let res = ep2_recv.recv().await.unwrap();
match res {
Ep3Event::Disconnected { .. } => (),
e => panic!("Actually got event {:?}", e),
}
}

#[tokio::test(flavor = "multi_thread")]
async fn ep3_ban_after_connected_outgoing_side() {
let config = Arc::new(Config3::default());
Expand Down
18 changes: 9 additions & 9 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d36e4bb

Please sign in to comment.