Skip to content

Commit

Permalink
Merge pull request #291 from freedomlayer/real/fix/services-deadlock
Browse files Browse the repository at this point in the history
Add task names to timer clients (debugging)
  • Loading branch information
realcr authored Apr 25, 2020
2 parents 8750248 + 0d5b749 commit 46f53e4
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 52 deletions.
11 changes: 9 additions & 2 deletions components/channeler/src/connect_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,11 @@ where
fn transform(&mut self, friend_public_key: Self::Input) -> BoxFuture<'_, Self::Output> {
Box::pin(async move {
// TODO: Should we keep the unwrap()-s here?
let timer_stream = self.timer_client.request_timer_stream().await.unwrap();
let timer_stream = self
.timer_client
.request_timer_stream("PoolConnector::transform".to_owned())
.await
.unwrap();
create_connect_pool(
timer_stream,
self.encrypt_transform.clone(),
Expand Down Expand Up @@ -672,7 +676,10 @@ mod tests {
Box::pin(future::ready(Some(conn_pair)))
});

let timer_stream = timer_client.request_timer_stream().await.unwrap();
let timer_stream = timer_client
.request_timer_stream("task_pool_connector_backoff_ticks".to_owned())
.await
.unwrap();
let mut tick_sender = tick_sender_receiver.next().await.unwrap();

// Used for debugging the loop:
Expand Down
19 changes: 15 additions & 4 deletions components/channeler/src/listen_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ where
}

let loop_fut = async move {
let res_timer_stream = c_timer_client.request_timer_stream().await;
let res_timer_stream = c_timer_client
.request_timer_stream("PoolListener::listen".to_owned())
.await;
let timer_stream = match res_timer_stream {
Ok(timer_stream) => timer_stream,
Err(_) => {
Expand Down Expand Up @@ -444,7 +446,10 @@ mod tests {
dummy_timer_multi_sender(spawner.clone());
let backoff_ticks = 2;

let timer_stream = timer_client.request_timer_stream().await.unwrap();
let timer_stream = timer_client
.request_timer_stream("task_listen_pool_loop_set_local_addresses".to_owned())
.await
.unwrap();
let _tick_sender = tick_sender_receiver.next().await.unwrap();

let (mut config_sender, incoming_config) = mpsc::channel(0);
Expand Down Expand Up @@ -543,7 +548,10 @@ mod tests {
dummy_timer_multi_sender(spawner.clone());
let backoff_ticks = 2;

let timer_stream = timer_client.request_timer_stream().await.unwrap();
let timer_stream = timer_client
.request_timer_stream("task_listen_pool_loop_backoff_ticks".to_owned())
.await
.unwrap();
let mut tick_sender = tick_sender_receiver.next().await.unwrap();

let (mut config_sender, incoming_config) = mpsc::channel(0);
Expand Down Expand Up @@ -612,7 +620,10 @@ mod tests {
dummy_timer_multi_sender(spawner.clone());
let backoff_ticks = 2;

let timer_stream = timer_client.request_timer_stream().await.unwrap();
let timer_stream = timer_client
.request_timer_stream("task_listen_pool_loop_update_remove_friend".to_owned())
.await
.unwrap();
let _tick_sender = tick_sender_receiver.next().await.unwrap();

let (mut config_sender, incoming_config) = mpsc::channel(1);
Expand Down
6 changes: 5 additions & 1 deletion components/connection/src/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ where

fn transform(&mut self, input: Self::Input) -> BoxFuture<'_, Self::Output> {
Box::pin(async move {
let timer_stream = match self.timer_client.request_timer_stream().await {
let timer_stream = match self
.timer_client
.request_timer_stream("TimeoutFutTransform::transform".to_owned())
.await
{
Ok(timer_stream) => timer_stream,
Err(e) => {
error!("TimeoutTransform: request_timer_stream() error: {:?}", e);
Expand Down
7 changes: 1 addition & 6 deletions components/index_client/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ use proto::index_client::messages::{
};

use proto::index_server::messages::{IndexClientToServer, IndexServerAddress, IndexServerToClient};
/*
use proto::index_server::serialize::{
deserialize_index_server_to_client, serialize_index_client_to_server,
};
*/

use crate::client_session::IndexClientSession;
use crate::index_client::{
Expand Down Expand Up @@ -138,7 +133,7 @@ where
S: Spawn + Clone + Send + 'static,
{
let timer_stream = timer_client
.request_timer_stream()
.request_timer_stream("spawn_index_client".to_owned())
.await
.map_err(|_| SpawnIndexClientError::RequestTimerStreamError)?;

Expand Down
2 changes: 1 addition & 1 deletion components/index_server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where
.map_err(|_| IndexServerError::CreateGraphServiceError)?;

let timer_stream = timer_client
.request_timer_stream()
.request_timer_stream("index_server".to_owned())
.await
.map_err(|_| IndexServerError::RequestTimerStreamError)?;

Expand Down
21 changes: 17 additions & 4 deletions components/keepalive/src/keepalive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ where
let (user_sender, from_user) = mpsc::channel::<Vec<u8>>(1);

Box::pin(async move {
if let Ok(timer_stream) = self.timer_client.request_timer_stream().await {
if let Ok(timer_stream) = self
.timer_client
.request_timer_stream("transform_keepalive".to_owned())
.await
{
let keepalive_fut = inner_keepalive_loop(
to_remote,
from_remote,
Expand Down Expand Up @@ -274,7 +278,10 @@ mod tests {
let (to_user, mut user_receiver) = mpsc::channel::<Vec<u8>>(0);
let (mut user_sender, from_user) = mpsc::channel::<Vec<u8>>(0);

let timer_stream = timer_client.request_timer_stream().await.unwrap();
let timer_stream = timer_client
.request_timer_stream("task_keepalive_loop_basic".to_owned())
.await
.unwrap();
let keepalive_ticks = 16;
let fut_keepalive_loop = inner_keepalive_loop(
to_remote,
Expand Down Expand Up @@ -357,7 +364,10 @@ mod tests {
let (a_sender, b_receiver) = mpsc::channel(1);
let (b_sender, a_receiver) = mpsc::channel(1);

let timer_stream = timer_client.request_timer_stream().await.unwrap();
let timer_stream = timer_client
.request_timer_stream("task_keepalive_channel_basic_0".to_owned())
.await
.unwrap();
let (mut a_sender, mut a_receiver) = keepalive_channel(
a_sender,
a_receiver,
Expand All @@ -366,7 +376,10 @@ mod tests {
test_executor.clone(),
);

let timer_stream = timer_client.request_timer_stream().await.unwrap();
let timer_stream = timer_client
.request_timer_stream("task_keepalive_channel_basic_1".to_owned())
.await
.unwrap();
let (mut b_sender, mut b_receiver) = keepalive_channel(
b_sender,
b_receiver,
Expand Down
2 changes: 1 addition & 1 deletion components/relay/src/client/client_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where
CS: Sink<(PublicKey, ConnPairVec), Error = CSE> + Unpin + 'static,
{
let timer_stream = timer_client
.request_timer_stream()
.request_timer_stream("accept_connection".to_owned())
.await
.map_err(|_| AcceptConnectionError::RequestTimerStreamError)?;
let opt_conn_pair = connect_with_timeout(connector, conn_timeout_ticks, timer_stream).await;
Expand Down
5 changes: 4 additions & 1 deletion components/relay/src/server/conn_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ async fn process_conn(
}
});

let timer_stream = timer_client.request_timer_stream().await.unwrap();
let timer_stream = timer_client
.request_timer_stream("process_conn".to_owned())
.await
.unwrap();
let res = future_timeout(fut_receiver, timer_stream, conn_timeout_ticks).await?;
if res.is_none() {
warn!("process_conn(): timeout occurred");
Expand Down
2 changes: 1 addition & 1 deletion components/relay/src/server/server_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
S: Stream<Item = IncomingConn> + Unpin + Send,
{
let timer_stream = timer_client
.request_timer_stream()
.request_timer_stream("relay_server_loop".to_owned())
.await
.map_err(|_| RelayServerError::RequestTimerStreamError)?;
let timer_stream = timer_stream
Expand Down
2 changes: 1 addition & 1 deletion components/secure_channel/src/secure_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ where
// Is there a way to do it?

let timer_stream = timer_client
.request_timer_stream()
.request_timer_stream("secure_channel_loop".to_owned())
.await
.map_err(|_| SecureChannelError::RequestTimerStreamError)?;
let timer_stream = timer_stream
Expand Down
2 changes: 1 addition & 1 deletion components/stcompact/src/server_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ where
C: FutTransform<Input = NetAddress, Output = Option<ConnPairVec>> + Clone + Send + 'static,
{
let timer_stream = timer_client
.request_timer_stream()
.request_timer_stream("inner_server_loop".to_owned())
.await
.map_err(|_| ServerError::CreateTimerError)?
.map(|_| ServerEvent::TimerTick)
Expand Down
Loading

0 comments on commit 46f53e4

Please sign in to comment.