Skip to content

Commit

Permalink
better send logic and timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Aug 8, 2023
1 parent 11e9410 commit 1fffa89
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 53 deletions.
110 changes: 65 additions & 45 deletions crates/tx5/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl StateData {
msg_uniq,
rem_id,
data,
timestamp,
send_permit,
resp,
cli_url,
Expand All @@ -247,6 +248,7 @@ impl StateData {
msg_uniq,
rem_id,
data,
timestamp,
send_permit,
resp,
cli_url,
Expand Down Expand Up @@ -499,11 +501,13 @@ impl StateData {
Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn send_data(
&mut self,
msg_uniq: Uniq,
rem_id: Id,
data: BackBuf,
timestamp: std::time::Instant,
send_permit: tokio::sync::OwnedSemaphorePermit,
data_sent: tokio::sync::oneshot::Sender<Result<()>>,
cli_url: Tx5Url,
Expand All @@ -513,8 +517,8 @@ impl StateData {
?rem_id,
"Ignoring request to send data to banned remote"
);
let _ = data_sent.send(Err(Error::id("Ban")));
return Ok(());
//return Err(Error::id("Ban"));
}

self.send_map
Expand All @@ -523,7 +527,7 @@ impl StateData {
.push_back(SendData {
msg_uniq: msg_uniq.clone(),
data,
timestamp: std::time::Instant::now(),
timestamp,
resp: Some(data_sent),
send_permit,
});
Expand All @@ -532,7 +536,7 @@ impl StateData {

if let Some((e, _)) = self.conn_map.get(&rem_id) {
if let Some(conn) = e.upgrade() {
conn.notify_send_waiting().await;
conn.check_send_waiting(None).await;
return Ok(());
} else {
self.conn_map.remove(&rem_id);
Expand Down Expand Up @@ -793,6 +797,7 @@ enum StateCmd {
msg_uniq: Uniq,
rem_id: Id,
data: BackBuf,
timestamp: std::time::Instant,
send_permit: tokio::sync::OwnedSemaphorePermit,
resp: tokio::sync::oneshot::Sender<Result<()>>,
cli_url: Tx5Url,
Expand Down Expand Up @@ -1047,6 +1052,8 @@ impl State {
cli_url: Tx5Url,
data: B,
) -> impl Future<Output = Result<()>> + 'static + Send {
let timestamp = std::time::Instant::now();

let buf_list = if !cli_url.is_client() {
Err(Error::err(
"Invalid tx5 signal server url, expect client url",
Expand All @@ -1059,63 +1066,76 @@ impl State {

let this = self.clone();
async move {
let cli_url = &cli_url;
let msg_uniq = meta.state_uniq.sub();
tokio::time::timeout(meta.config.max_conn_init(), async move {
let cli_url = &cli_url;
let msg_uniq = meta.state_uniq.sub();
let msg_uniq = &msg_uniq;

let buf_list = buf_list?;
let buf_list = buf_list?;

for (idx, mut buf) in buf_list.into_iter().enumerate() {
let len = buf.len()?;
let mut resp_list = Vec::with_capacity(buf_list.len());

tracing::trace!(%msg_uniq, %len, "snd_data");
for (idx, mut buf) in buf_list.into_iter().enumerate() {
let len = buf.len()?;

if meta.snd_limit.available_permits() < len {
tracing::warn!(%msg_uniq, %len, "send queue full, waiting for permits");
}
tracing::trace!(%msg_uniq, %len, "snd_data");

let send_permit = meta
.snd_limit
.clone()
.acquire_many_owned(len as u32)
.await
.map_err(Error::err)?;
if meta.snd_limit.available_permits() < len {
tracing::warn!(%msg_uniq, %len, "send queue full, waiting for permits");
}

tracing::trace!(%msg_uniq, %idx, %len, "snd_data:got permit");
let send_permit = meta
.snd_limit
.clone()
.acquire_many_owned(len as u32)
.await
.map_err(Error::err)?;

let rem_id = cli_url.id().unwrap();
tracing::trace!(%msg_uniq, %idx, %len, "snd_data:got permit");

let (s_sent, r_sent) = tokio::sync::oneshot::channel();
let rem_id = cli_url.id().unwrap();

if let Err(err) = this.0.send(Ok(StateCmd::SendData {
msg_uniq: msg_uniq.clone(),
rem_id,
data: buf,
send_permit,
resp: s_sent,
cli_url: cli_url.clone(),
})) {
tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err");
return Err(err);
}
let (s_sent, r_sent) = tokio::sync::oneshot::channel();

match r_sent.await.map_err(|_| Error::id("Closed")) {
Ok(r) => match r {
Ok(_) => {
tracing::trace!(%msg_uniq, %idx, "snd_data:complete ok");
}
Err(err) => {
tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err");
return Err(err);
}
},
Err(err) => {
if let Err(err) = this.0.send(Ok(StateCmd::SendData {
msg_uniq: msg_uniq.clone(),
rem_id,
data: buf,
timestamp,
send_permit,
resp: s_sent,
cli_url: cli_url.clone(),
})) {
tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err");
return Err(err);
}

resp_list.push(async move {
match r_sent.await.map_err(|_| Error::id("Closed")) {
Ok(r) => match r {
Ok(_) => {
tracing::trace!(%msg_uniq, %idx, "snd_data:complete ok");
}
Err(err) => {
tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err");
return Err(err);
}
},
Err(err) => {
tracing::trace!(%msg_uniq, %idx, ?err, "snd_data:complete err");
return Err(err);
}
}
Ok(())
});
}
}

Ok(())
for resp in resp_list {
resp.await?;
}

Ok(())
}).await.map_err(|_| Error::id("Timeout"))?
}
}

Expand Down
34 changes: 26 additions & 8 deletions crates/tx5/src/state/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ struct ConnStateData {
answer: (u64, u64, u64, u64),
ice: (u64, u64, u64, u64),
buf_state: BufState,
send_wait: bool,
}

impl Drop for ConnStateData {
Expand Down Expand Up @@ -311,9 +312,10 @@ impl ConnStateData {
ConnCmd::InAnswer { answer } => self.in_answer(answer).await,
ConnCmd::InIce { ice, cache } => self.in_ice(ice, cache).await,
ConnCmd::Ready => self.ready().await,
ConnCmd::MaybeFetchForSend { buf_state } => {
self.maybe_fetch_for_send(buf_state).await
}
ConnCmd::MaybeFetchForSend {
send_complete,
buf_state,
} => self.maybe_fetch_for_send(send_complete, buf_state).await,
ConnCmd::Send { to_send } => self.send(to_send).await,
ConnCmd::Recv {
ident,
Expand Down Expand Up @@ -503,19 +505,25 @@ impl ConnStateData {
.on_conn_preflight(self.meta.cli_url.clone())
.await?
.unwrap_or_else(bytes::Bytes::new);

for buf in divide_send(&*self.meta.config, &self.meta.snd_ident, data)?
{
self.conn_evt.snd_data(self.this.clone(), buf, None, None);
}

self.meta.connected.store(true, atomic::Ordering::SeqCst);
self.maybe_fetch_for_send(None).await
self.maybe_fetch_for_send(false, None).await
}

async fn maybe_fetch_for_send(
&mut self,
send_complete: bool,
buf_state: Option<BufState>,
) -> Result<()> {
if send_complete {
self.send_wait = false;
}

if let Some(buf_state) = buf_state {
self.buf_state = buf_state;
}
Expand All @@ -539,6 +547,11 @@ impl ConnStateData {
return Ok(());
}

if self.send_wait {
// we already have an outgoing send, don't request another
return Ok(());
}

if let Some(state) = self.state.upgrade() {
state.fetch_for_send(self.this.clone(), self.rem_id)?;
Ok(())
Expand All @@ -561,6 +574,7 @@ impl ConnStateData {
self.meta.last_active_at = std::time::Instant::now();
self.meta.metric_bytes_snd.add(data.len()? as u64);

self.send_wait = true;
self.conn_evt.snd_data(
self.this.clone(),
data,
Expand Down Expand Up @@ -686,6 +700,7 @@ enum ConnCmd {
},
Ready,
MaybeFetchForSend {
send_complete: bool,
buf_state: Option<BufState>,
},
Send {
Expand Down Expand Up @@ -734,6 +749,7 @@ async fn conn_state_task(
answer: (0, 0, 0, 0),
ice: (0, 0, 0, 0),
buf_state: BufState::Low,
send_wait: false,
};

let mut permit = None;
Expand Down Expand Up @@ -1167,10 +1183,11 @@ impl ConnState {
let _ = self.0.send(Ok(ConnCmd::InIce { ice, cache }));
}

pub(crate) async fn notify_send_waiting(&self) {
let _ = self
.0
.send(Ok(ConnCmd::MaybeFetchForSend { buf_state: None }));
pub(crate) async fn check_send_waiting(&self, buf_state: Option<BufState>) {
let _ = self.0.send(Ok(ConnCmd::MaybeFetchForSend {
send_complete: false,
buf_state,
}));
}

pub(crate) fn send(&self, to_send: SendData) {
Expand All @@ -1179,6 +1196,7 @@ impl ConnState {

pub(crate) fn notify_send_complete(&self, buf_state: BufState) {
let _ = self.0.send(Ok(ConnCmd::MaybeFetchForSend {
send_complete: true,
buf_state: Some(buf_state),
}));
}
Expand Down

0 comments on commit 1fffa89

Please sign in to comment.