From 14c361891a8f109f4ad5cd7d9675fd9ec77c7034 Mon Sep 17 00:00:00 2001 From: neonphog Date: Tue, 2 Apr 2024 10:10:27 -0600 Subject: [PATCH] fix for polite offer-er --- crates/tx5/src/ep3/peer.rs | 2 +- crates/tx5/src/ep3/sig.rs | 22 ++++++++++++++--- crates/tx5/src/ep3/test.rs | 49 ++++++++++++++++++++++++++++++++++---- 3 files changed, 65 insertions(+), 8 deletions(-) diff --git a/crates/tx5/src/ep3/peer.rs b/crates/tx5/src/ep3/peer.rs index cd401071..5e24e22c 100644 --- a/crates/tx5/src/ep3/peer.rs +++ b/crates/tx5/src/ep3/peer.rs @@ -12,7 +12,7 @@ impl From for PeerCmd { } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub(crate) enum PeerDir { ActiveOrOutgoing, OutgoingRestart, diff --git a/crates/tx5/src/ep3/sig.rs b/crates/tx5/src/ep3/sig.rs index e77bb876..5c7e58dc 100644 --- a/crates/tx5/src/ep3/sig.rs +++ b/crates/tx5/src/ep3/sig.rs @@ -256,7 +256,6 @@ impl Sig { unreachable!() } let is_polite = peer_id > self.sig.this_id; - let is_incoming = peer_dir.is_incoming(); match self .assert_peer_inner( peer_url.clone(), @@ -268,7 +267,7 @@ impl Sig { { Ok(peer) => Ok(peer), Err(err) => { - if !is_polite && !is_incoming { + if !is_polite { tracing::debug!( ?err, "Attempting Restart Offer Due To Error" @@ -281,6 +280,19 @@ impl Sig { ) .await } else { + // wait a short time to see if a restart comes from the + // remote side + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // if a new peer got added in the mean time, return that instead + let r = + self.peer_map.lock().unwrap().get(&peer_id).cloned(); + + if let Some((_new_peer_uniq, _cmd, _ans, new_peer_fut)) = r + { + return new_peer_fut.await; + } + Err(err) } } @@ -304,12 +316,16 @@ impl Sig { let mut tmp = None; + let is_polite = peer_id > self.sig.this_id; + + tracing::trace!(%is_polite, ?peer_id, ?self.sig.this_id, ?peer_dir); + let (peer_uniq, _peer_cmd_send, _answer_send, fut) = { let mut lock = self.peer_map.lock().unwrap(); if peer_dir.is_incoming() && lock.contains_key(&peer_id) { // we need to check negotiation - if peer_id > self.sig.this_id { + if is_polite { // we are the polite node, drop our existing connection tmp = lock.remove(&peer_id); } diff --git a/crates/tx5/src/ep3/test.rs b/crates/tx5/src/ep3/test.rs index df54f775..abd40f48 100644 --- a/crates/tx5/src/ep3/test.rs +++ b/crates/tx5/src/ep3/test.rs @@ -46,7 +46,6 @@ impl Test { if relay { ice.ice_transport_policy = tx5_go_pion::ICETransportPolicy::Relay; } - //println!("iceServers: {ice:#?}"); let mut this = Test { sig_srv_hnd: None, @@ -109,15 +108,57 @@ impl Test { } } +async fn test_eps_by_politeness( + test: &Test, + config: &Arc, + polite_first: bool, +) -> ( + (PeerUrl, Ep3, EventRecv), + (PeerUrl, Ep3, EventRecv), +) { + let (u1, e1, r1) = test.ep(config.clone()).await; + let (u2, e2, r2) = test.ep(config.clone()).await; + let first_is_polite = u1.id().unwrap() < u2.id().unwrap(); + match (first_is_polite, polite_first) { + (true, true) | (false, false) => ((u1, e1, r1), (u2, e2, r2)), + _ => ((u2, e2, r2), (u1, e1, r1)), + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn ep3_turn_fallback_works_polite_first() { + inner_ep3_turn_fallback_works(true).await; +} + #[tokio::test(flavor = "multi_thread")] -async fn ep3_turn_fallback_works() { +async fn ep3_turn_fallback_works_impolite_first() { + inner_ep3_turn_fallback_works(false).await; +} + +async fn inner_ep3_turn_fallback_works(polite_first: bool) { + /* + * This test works by setting iceTransportPolicy to RELAY, + * meaning the connections will only accept RELAY type ice messages. + * Holochain, however, always starts by attempting STUN only, + * that is, it filters out the TURN servers from the initial + * iceServers list. This, therefore, should always fail to connect, + * and thus, we'll execute the turn fallback code path. + * + * We also need to make sure this works whether the initial outgoing + * connection was the polite node, or if it was the impolite node. + * + * The impolite one is always the node to send the restart offer, + * so there is a slightly different code path if that was also the initial + * offer-er or if it was initially the answer, and now needs to offer. + */ + let mut config = Config3::default(); config.timeout = std::time::Duration::from_secs(5); let config = Arc::new(config); let test = Test::with_config(true).await; - let (_cli_url1, ep1, _ep1_recv) = test.ep(config.clone()).await; - let (cli_url2, _ep2, mut ep2_recv) = test.ep(config).await; + let ((_cli_url1, ep1, _ep1_recv), (cli_url2, _ep2, mut ep2_recv)) = + test_eps_by_politeness(&test, &config, polite_first).await; ep1.send(cli_url2, b"hello").await.unwrap();