Skip to content

Commit

Permalink
Prevent stream timeout by removing backpressure
Browse files Browse the repository at this point in the history
When UI thread is suspended (winit event loop) we
can't remove messages sent from the IRC stream channel.
This backpressure stalls the stream and prevents
it from dealing w/ new IRC messages / pings, causing
the connection to timeout.

Use an unbounded channel to remove all backpressure.
If the UI is suspended, memory will start increasing
instead, which is preferrable to timeout.
  • Loading branch information
tarkah committed Jul 10, 2024
1 parent 6b63c40 commit e90e72f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 57 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Unreleased

Fixed:

- Connection timeout when UI is suspended on an offscreen workspace due to channel backpressure

# 2024.8 (2024-07-05)

Added:
Expand Down
102 changes: 50 additions & 52 deletions data/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,21 @@ struct Stream {
receiver: mpsc::Receiver<proto::Message>,
}

pub async fn run(
pub fn run(
server: server::Entry,
proxy: Option<config::Proxy>,
mut sender: mpsc::Sender<Update>,
) -> impl futures::Stream<Item = Update> {
let (sender, receiver) = mpsc::unbounded();

let runner = stream::once(_run(server, proxy, sender)).map(|_| unreachable!());

stream::select(receiver, runner)
}

async fn _run(
server: server::Entry,
proxy: Option<config::Proxy>,
sender: mpsc::UnboundedSender<Update>,
) -> Never {
let server::Entry { server, config } = server;

Expand All @@ -82,14 +93,12 @@ pub async fn run(
let mut state = State::Disconnected { last_retry: None };

// Notify app of initial disconnected state
let _ = sender
.send(Update::Disconnected {
server: server.clone(),
is_initial,
error: None,
sent_time: Utc::now(),
})
.await;
let _ = sender.unbounded_send(Update::Disconnected {
server: server.clone(),
is_initial,
error: None,
sent_time: Utc::now(),
});

loop {
match &mut state {
Expand All @@ -106,14 +115,12 @@ pub async fn run(
Ok((stream, client)) => {
log::info!("[{server}] connected");

let _ = sender
.send(Update::Connected {
server: server.clone(),
client,
is_initial,
sent_time: Utc::now(),
})
.await;
let _ = sender.unbounded_send(Update::Connected {
server: server.clone(),
client,
is_initial,
sent_time: Utc::now(),
});

is_initial = false;

Expand All @@ -133,13 +140,11 @@ pub async fn run(

log::warn!("[{server}] connection failed: {error}");

let _ = sender
.send(Update::ConnectionFailed {
server: server.clone(),
error,
sent_time: Utc::now(),
})
.await;
let _ = sender.unbounded_send(Update::ConnectionFailed {
server: server.clone(),
error,
sent_time: Utc::now(),
});

*last_retry = Some(Instant::now());
}
Expand Down Expand Up @@ -185,14 +190,12 @@ pub async fn run(
}
proto::Command::ERROR(error) => {
log::warn!("[{server}] disconnected: {error}");
let _ = sender
.send(Update::Disconnected {
server: server.clone(),
is_initial,
error: Some(error),
sent_time: Utc::now(),
})
.await;
let _ = sender.unbounded_send(Update::Disconnected {
server: server.clone(),
is_initial,
error: Some(error),
sent_time: Utc::now(),
});
state = State::Disconnected {
last_retry: Some(Instant::now()),
};
Expand All @@ -206,29 +209,26 @@ pub async fn run(
}
Input::IrcMessage(Err(e)) => {
log::warn!("[{server}] disconnected: {e}");
let _ = sender
.send(Update::Disconnected {
server: server.clone(),
is_initial,
error: Some(e.to_string()),
sent_time: Utc::now(),
})
.await;
let _ = sender.unbounded_send(Update::Disconnected {
server: server.clone(),
is_initial,
error: Some(e.to_string()),
sent_time: Utc::now(),
});
state = State::Disconnected {
last_retry: Some(Instant::now()),
};
}
Input::Batch(messages) => {
let _ = sender
.send(Update::MessagesReceived(server.clone(), messages))
.await;
.unbounded_send(Update::MessagesReceived(server.clone(), messages));
}
Input::Send(message) => {
if let Command::QUIT(reason) = &message.command {
let reason = reason.clone();

let _ = stream.connection.send(message).await;
let _ = sender.send(Update::Quit(server.clone(), reason)).await;
let _ = sender.unbounded_send(Update::Quit(server.clone(), reason));

log::info!("[{server}] quit");

Expand All @@ -249,14 +249,12 @@ pub async fn run(
}
Input::PingTimeout => {
log::warn!("[{server}] ping timeout");
let _ = sender
.send(Update::Disconnected {
server: server.clone(),
is_initial,
error: Some("ping timeout".into()),
sent_time: Utc::now(),
})
.await;
let _ = sender.unbounded_send(Update::Disconnected {
server: server.clone(),
is_initial,
error: Some("ping timeout".into()),
sent_time: Utc::now(),
});
state = State::Disconnected {
last_retry: Some(Instant::now()),
};
Expand Down
6 changes: 1 addition & 5 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,5 @@ use data::{config, server};
use iced::{subscription, Subscription};

pub fn run(entry: server::Entry, proxy: Option<config::Proxy>) -> Subscription<stream::Update> {
// Channel messages are batched every 50ms so channel size 10 ~= 500ms which
// app thread should more than easily keep up with
subscription::channel(entry.server.clone(), 10, move |sender| {
stream::run(entry, proxy, sender)
})
subscription::run_with_id(entry.server.clone(), stream::run(entry, proxy))
}

0 comments on commit e90e72f

Please sign in to comment.