Skip to content

Commit

Permalink
Merge pull request #251 from async-rs/blocking-unstable
Browse files Browse the repository at this point in the history
add an unstable `task::blocking` function
  • Loading branch information
yoshuawuyts authored Oct 9, 2019
2 parents 460b8af + c27623c commit 9ab7b1a
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 41 deletions.
4 changes: 2 additions & 2 deletions src/fs/read_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::fs::DirEntry;
use crate::future::Future;
use crate::io;
use crate::stream::Stream;
use crate::task::{blocking, Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

/// Returns a stream of entries in a directory.
///
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct ReadDir(State);
#[derive(Debug)]
enum State {
Idle(Option<std::fs::ReadDir>),
Busy(blocking::JoinHandle<(std::fs::ReadDir, Option<io::Result<std::fs::DirEntry>>)>),
Busy(JoinHandle<(std::fs::ReadDir, Option<io::Result<std::fs::DirEntry>>)>),
}

impl ReadDir {
Expand Down
4 changes: 2 additions & 2 deletions src/io/stderr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cfg_if::cfg_if;

use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

/// Constructs a new handle to the standard error of the current process.
///
Expand Down Expand Up @@ -56,7 +56,7 @@ enum State {
/// The stderr is blocked on an asynchronous operation.
///
/// Awaiting this operation will result in the new state of the stderr.
Busy(blocking::JoinHandle<State>),
Busy(JoinHandle<State>),
}

/// Inner representation of the asynchronous stderr.
Expand Down
4 changes: 2 additions & 2 deletions src/io/stdin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cfg_if::cfg_if;

use crate::future::{self, Future};
use crate::io::{self, Read};
use crate::task::{blocking, Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

/// Constructs a new handle to the standard input of the current process.
///
Expand Down Expand Up @@ -57,7 +57,7 @@ enum State {
/// The stdin is blocked on an asynchronous operation.
///
/// Awaiting this operation will result in the new state of the stdin.
Busy(blocking::JoinHandle<State>),
Busy(JoinHandle<State>),
}

/// Inner representation of the asynchronous stdin.
Expand Down
4 changes: 2 additions & 2 deletions src/io/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cfg_if::cfg_if;

use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

/// Constructs a new handle to the standard output of the current process.
///
Expand Down Expand Up @@ -56,7 +56,7 @@ enum State {
/// The stdout is blocked on an asynchronous operation.
///
/// Awaiting this operation will result in the new state of the stdout.
Busy(blocking::JoinHandle<State>),
Busy(JoinHandle<State>),
}

/// Inner representation of the asynchronous stdout.
Expand Down
5 changes: 2 additions & 3 deletions src/net/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use cfg_if::cfg_if;

use crate::future::Future;
use crate::io;
use crate::task::blocking;
use crate::task::{Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

cfg_if! {
if #[cfg(feature = "docs")] {
Expand Down Expand Up @@ -48,7 +47,7 @@ pub trait ToSocketAddrs {
#[allow(missing_debug_implementations)]
pub enum ToSocketAddrsFuture<'a, I> {
Phantom(PhantomData<&'a ()>),
Join(blocking::JoinHandle<io::Result<I>>),
Join(JoinHandle<io::Result<I>>),
Ready(Option<io::Result<I>>),
}

Expand Down
38 changes: 8 additions & 30 deletions src/task/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! A thread pool for running blocking functions asynchronously.
use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;
Expand All @@ -10,16 +8,16 @@ use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static;

use crate::future::Future;
use crate::task::{Context, Poll};
use crate::task::task::{JoinHandle, Tag};
use crate::utils::abort_on_panic;

const MAX_THREADS: u64 = 10_000;

static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);

struct Pool {
sender: Sender<async_task::Task<()>>,
receiver: Receiver<async_task::Task<()>>,
sender: Sender<async_task::Task<Tag>>,
receiver: Receiver<async_task::Task<Tag>>,
}

lazy_static! {
Expand Down Expand Up @@ -85,7 +83,7 @@ fn maybe_create_another_blocking_thread() {
// Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work.
fn schedule(t: async_task::Task<()>) {
fn schedule(t: async_task::Task<Tag>) {
if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
Expand All @@ -98,35 +96,15 @@ fn schedule(t: async_task::Task<()>) {
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub fn spawn<F, R>(future: F) -> JoinHandle<R>
pub(crate) fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(future, schedule, ());
let tag = Tag::new(None);
let (task, handle) = async_task::spawn(future, schedule, tag);
task.schedule();
JoinHandle(handle)
}

/// A handle to a blocking task.
pub struct JoinHandle<R>(async_task::JoinHandle<R, ()>);

impl<R> Unpin for JoinHandle<R> {}

impl<R> Future for JoinHandle<R> {
type Output = R;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx).map(|out| out.unwrap())
}
}

impl<R> fmt::Debug for JoinHandle<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinHandle")
.field("handle", &self.0)
.finish()
}
JoinHandle::new(handle)
}

/// Generates a random number in `0..n`.
Expand Down
16 changes: 16 additions & 0 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,19 @@ mod task_local;
mod worker;

pub(crate) mod blocking;

/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
// Once this function stabilizes we should merge `blocking::spawn` into this so
// all code in our crate uses `task::blocking` too.
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub fn blocking<F, R>(future: F) -> task::JoinHandle<R>
where
F: crate::future::Future<Output = R> + Send + 'static,
R: Send + 'static,
{
blocking::spawn(future)
}

0 comments on commit 9ab7b1a

Please sign in to comment.