From 1fffa892520d67d551671fd9a640beccee6c2996 Mon Sep 17 00:00:00 2001 From: neonphog Date: Tue, 8 Aug 2023 15:09:57 -0600 Subject: [PATCH] better send logic and timeout --- crates/tx5/src/state.rs | 110 +++++++++++++++++++++-------------- crates/tx5/src/state/conn.rs | 34 ++++++++--- 2 files changed, 91 insertions(+), 53 deletions(-) diff --git a/crates/tx5/src/state.rs b/crates/tx5/src/state.rs index e12b4406..3530a558 100644 --- a/crates/tx5/src/state.rs +++ b/crates/tx5/src/state.rs @@ -239,6 +239,7 @@ impl StateData { msg_uniq, rem_id, data, + timestamp, send_permit, resp, cli_url, @@ -247,6 +248,7 @@ impl StateData { msg_uniq, rem_id, data, + timestamp, send_permit, resp, cli_url, @@ -499,11 +501,13 @@ impl StateData { Ok(()) } + #[allow(clippy::too_many_arguments)] async fn send_data( &mut self, msg_uniq: Uniq, rem_id: Id, data: BackBuf, + timestamp: std::time::Instant, send_permit: tokio::sync::OwnedSemaphorePermit, data_sent: tokio::sync::oneshot::Sender>, cli_url: Tx5Url, @@ -513,8 +517,8 @@ impl StateData { ?rem_id, "Ignoring request to send data to banned remote" ); + let _ = data_sent.send(Err(Error::id("Ban"))); return Ok(()); - //return Err(Error::id("Ban")); } self.send_map @@ -523,7 +527,7 @@ impl StateData { .push_back(SendData { msg_uniq: msg_uniq.clone(), data, - timestamp: std::time::Instant::now(), + timestamp, resp: Some(data_sent), send_permit, }); @@ -532,7 +536,7 @@ impl StateData { if let Some((e, _)) = self.conn_map.get(&rem_id) { if let Some(conn) = e.upgrade() { - conn.notify_send_waiting().await; + conn.check_send_waiting(None).await; return Ok(()); } else { self.conn_map.remove(&rem_id); @@ -793,6 +797,7 @@ enum StateCmd { msg_uniq: Uniq, rem_id: Id, data: BackBuf, + timestamp: std::time::Instant, send_permit: tokio::sync::OwnedSemaphorePermit, resp: tokio::sync::oneshot::Sender>, cli_url: Tx5Url, @@ -1047,6 +1052,8 @@ impl State { cli_url: Tx5Url, data: B, ) -> impl Future> + 'static + Send { + let timestamp = std::time::Instant::now(); + let buf_list = if !cli_url.is_client() { Err(Error::err( "Invalid tx5 signal server url, expect client url", @@ -1059,63 +1066,76 @@ impl State { let this = self.clone(); async move { - let cli_url = &cli_url; - let msg_uniq = meta.state_uniq.sub(); + tokio::time::timeout(meta.config.max_conn_init(), async move { + let cli_url = &cli_url; + let msg_uniq = meta.state_uniq.sub(); + let msg_uniq = &msg_uniq; - let buf_list = buf_list?; + let buf_list = buf_list?; - for (idx, mut buf) in buf_list.into_iter().enumerate() { - let len = buf.len()?; + let mut resp_list = Vec::with_capacity(buf_list.len()); - tracing::trace!(%msg_uniq, %len, "snd_data"); + for (idx, mut buf) in buf_list.into_iter().enumerate() { + let len = buf.len()?; - if meta.snd_limit.available_permits() < len { - tracing::warn!(%msg_uniq, %len, "send queue full, waiting for permits"); - } + tracing::trace!(%msg_uniq, %len, "snd_data"); - let send_permit = meta - .snd_limit - .clone() - .acquire_many_owned(len as u32) - .await - .map_err(Error::err)?; + if meta.snd_limit.available_permits() < len { + tracing::warn!(%msg_uniq, %len, "send queue full, waiting for permits"); + } - tracing::trace!(%msg_uniq, %idx, %len, "snd_data:got permit"); + let send_permit = meta + .snd_limit + .clone() + .acquire_many_owned(len as u32) + .await + .map_err(Error::err)?; - let rem_id = cli_url.id().unwrap(); + tracing::trace!(%msg_uniq, %idx, %len, "snd_data:got permit"); - let (s_sent, r_sent) = tokio::sync::oneshot::channel(); + let rem_id = cli_url.id().unwrap(); - if let Err(err) = this.0.send(Ok(StateCmd::SendData { - msg_uniq: msg_uniq.clone(), - rem_id, - data: buf, - send_permit, - resp: s_sent, - cli_url: cli_url.clone(), - })) { - tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err"); - return Err(err); - } + let (s_sent, r_sent) = tokio::sync::oneshot::channel(); - match r_sent.await.map_err(|_| Error::id("Closed")) { - Ok(r) => match r { - Ok(_) => { - tracing::trace!(%msg_uniq, %idx, "snd_data:complete ok"); - } - Err(err) => { - tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err"); - return Err(err); - } - }, - Err(err) => { + if let Err(err) = this.0.send(Ok(StateCmd::SendData { + msg_uniq: msg_uniq.clone(), + rem_id, + data: buf, + timestamp, + send_permit, + resp: s_sent, + cli_url: cli_url.clone(), + })) { tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err"); return Err(err); } + + resp_list.push(async move { + match r_sent.await.map_err(|_| Error::id("Closed")) { + Ok(r) => match r { + Ok(_) => { + tracing::trace!(%msg_uniq, %idx, "snd_data:complete ok"); + } + Err(err) => { + tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err"); + return Err(err); + } + }, + Err(err) => { + tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err"); + return Err(err); + } + } + Ok(()) + }); } - } - Ok(()) + for resp in resp_list { + resp.await?; + } + + Ok(()) + }).await.map_err(|_| Error::id("Timeout"))? } } diff --git a/crates/tx5/src/state/conn.rs b/crates/tx5/src/state/conn.rs index f1e82c52..e06d7580 100644 --- a/crates/tx5/src/state/conn.rs +++ b/crates/tx5/src/state/conn.rs @@ -256,6 +256,7 @@ struct ConnStateData { answer: (u64, u64, u64, u64), ice: (u64, u64, u64, u64), buf_state: BufState, + send_wait: bool, } impl Drop for ConnStateData { @@ -311,9 +312,10 @@ impl ConnStateData { ConnCmd::InAnswer { answer } => self.in_answer(answer).await, ConnCmd::InIce { ice, cache } => self.in_ice(ice, cache).await, ConnCmd::Ready => self.ready().await, - ConnCmd::MaybeFetchForSend { buf_state } => { - self.maybe_fetch_for_send(buf_state).await - } + ConnCmd::MaybeFetchForSend { + send_complete, + buf_state, + } => self.maybe_fetch_for_send(send_complete, buf_state).await, ConnCmd::Send { to_send } => self.send(to_send).await, ConnCmd::Recv { ident, @@ -503,19 +505,25 @@ impl ConnStateData { .on_conn_preflight(self.meta.cli_url.clone()) .await? .unwrap_or_else(bytes::Bytes::new); + for buf in divide_send(&*self.meta.config, &self.meta.snd_ident, data)? { self.conn_evt.snd_data(self.this.clone(), buf, None, None); } self.meta.connected.store(true, atomic::Ordering::SeqCst); - self.maybe_fetch_for_send(None).await + self.maybe_fetch_for_send(false, None).await } async fn maybe_fetch_for_send( &mut self, + send_complete: bool, buf_state: Option, ) -> Result<()> { + if send_complete { + self.send_wait = false; + } + if let Some(buf_state) = buf_state { self.buf_state = buf_state; } @@ -539,6 +547,11 @@ impl ConnStateData { return Ok(()); } + if self.send_wait { + // we already have an outgoing send, don't request another + return Ok(()); + } + if let Some(state) = self.state.upgrade() { state.fetch_for_send(self.this.clone(), self.rem_id)?; Ok(()) @@ -561,6 +574,7 @@ impl ConnStateData { self.meta.last_active_at = std::time::Instant::now(); self.meta.metric_bytes_snd.add(data.len()? as u64); + self.send_wait = true; self.conn_evt.snd_data( self.this.clone(), data, @@ -686,6 +700,7 @@ enum ConnCmd { }, Ready, MaybeFetchForSend { + send_complete: bool, buf_state: Option, }, Send { @@ -734,6 +749,7 @@ async fn conn_state_task( answer: (0, 0, 0, 0), ice: (0, 0, 0, 0), buf_state: BufState::Low, + send_wait: false, }; let mut permit = None; @@ -1167,10 +1183,11 @@ impl ConnState { let _ = self.0.send(Ok(ConnCmd::InIce { ice, cache })); } - pub(crate) async fn notify_send_waiting(&self) { - let _ = self - .0 - .send(Ok(ConnCmd::MaybeFetchForSend { buf_state: None })); + pub(crate) async fn check_send_waiting(&self, buf_state: Option) { + let _ = self.0.send(Ok(ConnCmd::MaybeFetchForSend { + send_complete: false, + buf_state, + })); } pub(crate) fn send(&self, to_send: SendData) { @@ -1179,6 +1196,7 @@ impl ConnState { pub(crate) fn notify_send_complete(&self, buf_state: BufState) { let _ = self.0.send(Ok(ConnCmd::MaybeFetchForSend { + send_complete: true, buf_state: Some(buf_state), })); }