Skip to content

Commit

Permalink
Merge pull request #285 from freedomlayer/real/fix/timer-deadlock
Browse files Browse the repository at this point in the history
Fix possible timer deadlock
  • Loading branch information
realcr authored Apr 12, 2020
2 parents f4c995a + c52a037 commit 96eed90
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 26 deletions.
16 changes: 10 additions & 6 deletions components/keepalive/src/keepalive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ mod tests {
use futures::executor::{LocalPool, ThreadPool};
use futures::task::{Spawn, SpawnExt};
use futures::FutureExt;

use common::test_executor::TestExecutor;
use timer::create_timer_incoming;

/// Util function for tests
Expand Down Expand Up @@ -339,10 +341,10 @@ mod tests {
LocalPool::new().run_until(task_keepalive_loop_basic(thread_pool.clone()));
}

async fn task_keepalive_channel_basic(spawner: impl Spawn + Clone) {
async fn task_keepalive_channel_basic(test_executor: TestExecutor) {
// Create a mock time service:
let (mut tick_sender, tick_receiver) = mpsc::channel::<()>(0);
let mut timer_client = create_timer_incoming(tick_receiver, spawner.clone()).unwrap();
let mut timer_client = create_timer_incoming(tick_receiver, test_executor.clone()).unwrap();

let keepalive_ticks = 16;

Expand All @@ -361,7 +363,7 @@ mod tests {
a_receiver,
timer_stream,
keepalive_ticks,
spawner.clone(),
test_executor.clone(),
);

let timer_stream = timer_client.request_timer_stream().await.unwrap();
Expand All @@ -370,7 +372,7 @@ mod tests {
b_receiver,
timer_stream,
keepalive_ticks,
spawner.clone(),
test_executor.clone(),
);

a_sender.send(vec![1, 2, 3]).await.unwrap();
Expand All @@ -382,6 +384,7 @@ mod tests {
// Move some time forward
for _ in 0..(keepalive_ticks / 2) + 1 {
tick_sender.send(()).await.unwrap();
test_executor.wait().await;
}

a_sender.send(vec![1, 2, 3]).await.unwrap();
Expand All @@ -393,7 +396,8 @@ mod tests {

#[test]
fn test_keepalive_channel_basic() {
let thread_pool = ThreadPool::new().unwrap();
LocalPool::new().run_until(task_keepalive_channel_basic(thread_pool.clone()));
let test_executor = TestExecutor::new();
let res = test_executor.run(task_keepalive_channel_basic(test_executor.clone()));
assert!(res.is_output());
}
}
1 change: 1 addition & 0 deletions components/net/src/tcp_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ where

fn transform(&mut self, net_address: Self::Input) -> BoxFuture<'_, Self::Output> {
Box::pin(async move {
info!("TcpConnector: Connecting to {:?}", net_address.as_str());
let tcp_stream = TcpStream::connect(net_address.as_str()).await.ok()?;

Some(tcp_stream_to_conn_pair(
Expand Down
4 changes: 4 additions & 0 deletions components/net/src/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ where
let mut incoming_conns = listener.incoming();

while let Some(Ok(tcp_stream)) = incoming_conns.next().await {
info!(
"TcpListener: Incoming connection from: {:?}",
tcp_stream.peer_addr(),
);
let conn_pair =
tcp_stream_to_conn_pair(tcp_stream, c_max_frame_length, &mut c_spawner);
if let Err(e) = conn_receiver_sender.send(conn_pair).await {
Expand Down
31 changes: 20 additions & 11 deletions components/secure_channel/src/secure_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ mod tests {
use super::*;

use futures::channel::oneshot;
use futures::executor::{LocalPool, ThreadPool};
// use futures::executor::LocalPool;
use futures::task::SpawnExt;
use futures::Future;

Expand All @@ -350,10 +350,13 @@ mod tests {

use proto::crypto::PrivateKey;

use common::test_executor::TestExecutor;

async fn secure_channel1(
fut_sc: impl Future<Output = Result<(PublicKey, ConnPairVec), SecureChannelError>> + 'static,
mut tick_sender: mpsc::Sender<()>,
output_sender: oneshot::Sender<bool>,
test_executor: TestExecutor,
) {
let (_public_key, conn_pair_vec) = fut_sc.await.unwrap();
let (mut sender, mut receiver) = conn_pair_vec.split();
Expand All @@ -364,6 +367,7 @@ mod tests {
// Move time forward, to cause rekeying:
for _ in 0_usize..20 {
tick_sender.send(()).await.unwrap();
test_executor.wait().await;
}
sender.send(vec![0, 1, 2]).await.unwrap();

Expand All @@ -390,11 +394,11 @@ mod tests {
#[test]
fn test_secure_channel_basic() {
// Start the Identity service:
let thread_pool = ThreadPool::new().unwrap();
let test_executor = TestExecutor::new();

// Create a mock time service:
let (tick_sender, tick_receiver) = mpsc::channel::<()>(0);
let timer_client = create_timer_incoming(tick_receiver, thread_pool.clone()).unwrap();
let timer_client = create_timer_incoming(tick_receiver, test_executor.clone()).unwrap();

let rng1 = DummyRandom::new(&[1u8]);
let pkcs8 = PrivateKey::rand_gen(&rng1);
Expand All @@ -410,10 +414,10 @@ mod tests {
let (requests_sender2, identity_server2) = create_identity(identity2);
let identity_client2 = IdentityClient::new(requests_sender2);

thread_pool
test_executor
.spawn(identity_server1.then(|_| future::ready(())))
.unwrap();
thread_pool
test_executor
.spawn(identity_server2.then(|_| future::ready(())))
.unwrap();

Expand All @@ -430,7 +434,7 @@ mod tests {
rng1.clone(),
timer_client.clone(),
ticks_to_rekey,
thread_pool.clone(),
test_executor.clone(),
);

let fut_sc2 = create_secure_channel(
Expand All @@ -441,28 +445,33 @@ mod tests {
rng2.clone(),
timer_client.clone(),
ticks_to_rekey,
thread_pool.clone(),
test_executor.clone(),
);

let (output_sender1, output_receiver1) = oneshot::channel::<bool>();
let (output_sender2, output_receiver2) = oneshot::channel::<bool>();

thread_pool
test_executor
.spawn(secure_channel1(
fut_sc1,
tick_sender.clone(),
output_sender1,
test_executor.clone(),
))
.unwrap();
thread_pool

test_executor
.spawn(secure_channel2(
fut_sc2,
tick_sender.clone(),
output_sender2,
))
.unwrap();

assert_eq!(true, LocalPool::new().run_until(output_receiver1).unwrap());
assert_eq!(true, LocalPool::new().run_until(output_receiver2).unwrap());
assert_eq!(test_executor.run(output_receiver1).output(), Some(Ok(true)));
assert_eq!(test_executor.run(output_receiver2).output(), Some(Ok(true)));

// assert_eq!(true, LocalPool::new().run_until(output_receiver1).unwrap());
// assert_eq!(true, LocalPool::new().run_until(output_receiver2).unwrap());
}
}
26 changes: 22 additions & 4 deletions components/timer/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
// #![deny(warnings)]
#![allow(intra_doc_link_resolution_failure)]
#![allow(clippy::too_many_arguments, clippy::implicit_hasher, clippy::module_inception)]
#![allow(
clippy::too_many_arguments,
clippy::implicit_hasher,
clippy::module_inception
)]
// TODO: disallow clippy::too_many_arguments

// use common::futures_compat::create_interval;
Expand Down Expand Up @@ -99,6 +103,12 @@ enum TimerEvent {
RequestsDone,
}

// TODO: Possibly give to timer_loop as an argument?
/// Size of queue for pending ticks for a single client.
/// After `TICK_QUEUE_LEN` ticks, the timer will not be able to queue more ticks to a nonresponsive
/// client.
const TICK_QUEUE_LEN: usize = 8;

async fn timer_loop<M>(
incoming: M,
from_client: mpsc::Receiver<TimerRequest>,
Expand All @@ -124,13 +134,21 @@ where
let mut temp_tick_senders = Vec::new();
temp_tick_senders.append(&mut tick_senders);
for mut tick_sender in temp_tick_senders {
if let Ok(()) = tick_sender.send(TimerTick).await {
tick_senders.push(tick_sender);
match tick_sender.try_send(TimerTick) {
Ok(()) => tick_senders.push(tick_sender),
Err(e) => {
// In case of error, we disconnect client
if !e.is_disconnected() {
// Error trying to send a tick to client.
// Client might be too busy?
error!("timer_loop(): try_send() error: {:?}", e);
}
}
}
}
}
TimerEvent::Request(timer_request) => {
let (tick_sender, tick_receiver) = mpsc::channel(0);
let (tick_sender, tick_receiver) = mpsc::channel(TICK_QUEUE_LEN);
tick_senders.push(tick_sender);
let _ = timer_request.response_sender.send(tick_receiver);
}
Expand Down
14 changes: 9 additions & 5 deletions components/timer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,23 @@ mod tests {
use futures::task::{Spawn, SpawnExt};
use futures::SinkExt;

async fn task_future_timeout_on_time(spawner: impl Spawn + Clone + Send + 'static) {
use common::test_executor::TestExecutor;

async fn task_future_timeout_on_time(test_executor: TestExecutor) {
// Create a mock time service:
let (mut tick_sender, tick_receiver) = mpsc::channel::<()>(0);
let mut timer_client = create_timer_incoming(tick_receiver, spawner.clone()).unwrap();
let mut timer_client = create_timer_incoming(tick_receiver, test_executor.clone()).unwrap();

let (sender, receiver) = oneshot::channel::<()>();
let timer_stream = timer_client.request_timer_stream().await.unwrap();
let receiver = receiver.map(|res| res.unwrap());
let timeout_fut = spawner
let timeout_fut = test_executor
.spawn_with_handle(future_timeout(receiver, timer_stream, 8))
.unwrap();

for _ in 0..7usize {
tick_sender.send(()).await.unwrap();
test_executor.wait().await;
}

sender.send(()).unwrap();
Expand All @@ -75,8 +78,9 @@ mod tests {

#[test]
fn test_future_timeout_on_time() {
let thread_pool = ThreadPool::new().unwrap();
LocalPool::new().run_until(task_future_timeout_on_time(thread_pool.clone()));
let test_executor = TestExecutor::new();
let res = test_executor.run(task_future_timeout_on_time(test_executor.clone()));
assert!(res.is_output());
}

async fn task_future_timeout_late(spawner: impl Spawn + Clone + Send + 'static) {
Expand Down

0 comments on commit 96eed90

Please sign in to comment.