Skip to content

Commit

Permalink
Merge pull request #4 from triarius/fix
Browse files Browse the repository at this point in the history
Fix blocking on stdin
  • Loading branch information
triarius authored Sep 1, 2024
2 parents 7c437d0 + 87c094c commit 4224bd4
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 20 deletions.
57 changes: 57 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"

[dependencies]
clap = { version = "4.5.16", features = ["derive", "env"] }
crossbeam = "0.8.4"
eyre = "0.6.12"

[lints.clippy]
Expand Down
50 changes: 50 additions & 0 deletions src/flat_map_err.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/// A trait for `Result` that allows chaining a function that returns a `Result` to the error type.
///
/// # Examples
/// One use case is if the ok type, `T` has a default value, and the error type, `E` may take
/// multiple values that may be ignored, then this trait can be used to map the error type to the
/// default value of `T` for the errors that can be ignored.
pub(crate) trait FlatMapErr<T, D, E> {
fn flat_map_err<F>(self, f: F) -> Result<T, E>
where
F: FnOnce(D) -> Result<T, E>;
}

impl<T, D, E> FlatMapErr<T, D, E> for Result<T, D> {
fn flat_map_err<F>(self, f: F) -> Result<T, E>
where
F: FnOnce(D) -> Result<T, E>,
{
match self.map_err(f) {
Ok(t) | Err(Ok(t)) => Ok(t),
Err(Err(e)) => Err(e),
}
}
}

#[cfg(test)]
mod test {
#[test]
fn flat_map_err() {
use super::FlatMapErr;

#[derive(Debug, PartialEq, Eq)]
enum Error {
IgnorableError,
GraveError,
}

let ignore = |e| match e {
Error::IgnorableError => Ok(()),
Error::GraveError => Err(e),
};

let success = Ok(());
let ignorable_error = Err(Error::IgnorableError);
let grave_error = Err(Error::GraveError);

assert_eq!(success.flat_map_err(ignore), Ok(()));
assert_eq!(ignorable_error.flat_map_err(ignore), Ok(()));
assert_eq!(grave_error.flat_map_err(ignore), Err(Error::GraveError));
}
}
106 changes: 86 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
mod flat_map_err;

use crate::flat_map_err::FlatMapErr;
use crossbeam::channel::{bounded, select, Receiver, TrySendError};
use eyre::{eyre, Result};
use std::{
fs::File,
Expand All @@ -12,6 +16,9 @@ use std::{
/// # Errors
/// This function returns an error if the child process cannot be spawned, or if any of the
/// file or thread operations fail.
///
/// # Panics
/// Errors reading from stdin moving its data.
pub fn run(
cmd: &str,
args: &[&str],
Expand Down Expand Up @@ -41,23 +48,57 @@ pub fn run(
.take()
.ok_or_else(|| eyre!("child stdin is not piped"))?;

thread::scope(|s| -> Result<()> {
s.spawn(|| tee(child_out, stdout(), stdout_log_path));
s.spawn(|| tee(child_err, stderr(), stderr_log_path));
s.spawn(|| tee(stdin(), child_in, stdin_log_path));
let mut stdin_outputs = outputs(child_in, stdin_log_path)?;
let mut stdout_outputs = outputs(stdout(), stdout_log_path)?;
let mut stderr_outputs = outputs(stderr(), stderr_log_path)?;

// Create a channel to send data from stdin to the cancellable_tee thread.
let (t_in, r_in) = bounded(1);

// Read from stdin and send on a channel. We will call select! on this
// channel in the cancellable_tee thread.
// DO NOT join on this thread, it will cause reading stdin to block.
thread::spawn(move || {
let mut buffer = [0u8; 1024];
loop {
let n = stdin().read(&mut buffer).unwrap();
if n == 0 {
break;
}
t_in.send((buffer, n)).unwrap();
}
});

thread::scope(|s| -> Result<i32> {
let (t_cancel, r_cancel) = bounded(1);

// If cancellable_tee were not used here, reading from stdin will block the thread even
// after the child process has exited. This will prevent the process from exiting.
// So we have to cancel the tee thread when the child process exits and before
// joining on the tee thread.
s.spawn(move || cancellable_tee(&r_cancel, &r_in, &mut stdin_outputs[..]));
s.spawn(|| tee(child_out, &mut stdout_outputs[..]));
s.spawn(|| tee(child_err, &mut stderr_outputs[..]));

Ok(())
})?;
let code = child
.wait()?
.code()
.ok_or_else(|| eyre!("process terminated by signal"))?;

let code = child
.wait()?
.code()
.ok_or_else(|| eyre!("process terminated by signal"))?;
// Cancel the tee threads. This may fail if the stdin was closed before the child process.
t_cancel.try_send(()).flat_map_err(|e| match e {
TrySendError::Full(()) => Err(e),
TrySendError::Disconnected(()) => Ok(()), // If the stdin was close, this is expected
})?;

Ok(code)
Ok(code)
})
}

fn outputs<W: Write + 'static>(writer: W, filename: Option<&Path>) -> Result<Vec<Box<dyn Write>>> {
fn outputs<W: Write + Send + 'static>(
writer: W,
filename: Option<&Path>,
) -> Result<Vec<Box<(dyn Write + Send)>>> {
match filename {
Some(filename) => {
let file = File::create(filename)?;
Expand All @@ -67,20 +108,45 @@ fn outputs<W: Write + 'static>(writer: W, filename: Option<&Path>) -> Result<Vec
}
}

fn tee<W: Write + 'static>(
mut stream: impl Read,
output: W,
filename: Option<&Path>,
) -> Result<()> {
fn tee(mut stream: impl Read, outputs: &mut [Box<dyn Write + Send>]) -> Result<()> {
let mut buffer = [0u8; 1024];
let mut outputs = outputs(output, filename)?;
loop {
let n = stream.read(&mut buffer)?;
if n == 0 {
break;
}
for output in &mut outputs {
output.write_all(&buffer[..n])?;
for o in outputs.iter_mut() {
o.write_all(&buffer[..n])?;
}
}
Ok(())
}

/// Read from a channel and write to multiple outputs until the channel is closed or the cancel
/// channel is received.
///
/// # Errors
/// If writing to any of the outputs fails, this function returns an error.
fn cancellable_tee(
cancel: &Receiver<()>,
data: &Receiver<([u8; 1024], usize)>,
outputs: &mut [Box<dyn Write + Send>],
) -> Result<()> {
loop {
select! {
recv(cancel) -> _ => break,
recv(data) -> recv => {
if let Ok((buffer, n)) = recv {
if n == 0 {
continue;
}
for o in outputs.iter_mut() {
o.write_all(&buffer[..n])?;
}
} else {
break;
}
}
}
}
Ok(())
Expand Down

0 comments on commit 4224bd4

Please sign in to comment.