From d9733656a8fc16255b119fdf3193d3bea17d9493 Mon Sep 17 00:00:00 2001 From: Samuel Moelius Date: Wed, 8 May 2024 20:03:45 -0400 Subject: [PATCH] Experimental support for parallel fuzzing --- README.md | 3 + cargo-test-fuzz/Cargo.toml | 2 + .../src/bin/cargo_test_fuzz/transition.rs | 18 + cargo-test-fuzz/src/lib.rs | 370 ++++++++++++++---- 4 files changed, 308 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index 1834a9db..0e3471ce 100644 --- a/README.md +++ b/README.md @@ -255,6 +255,7 @@ Options: --backtrace Display backtraces --consolidate Move one target's crashes, hangs, and work queue to its corpus; to consolidate all targets, use --consolidate-all + --cpus Fuzz using at most cpus; default is all but one --display Display corpus, crashes, generic args, `impl` generic args, hangs, or work queue. By default, corpus uses an uninstrumented fuzz target; the others use an instrumented fuzz target. To display the @@ -282,6 +283,8 @@ Options: reset all targets, use --reset-all --resume Resume target's last fuzzing session --run-until-crash Stop fuzzing once a crash is found + --slice When there are not sufficiently many cpus to fuzz all targets + concurrently, fuzz them in intervals of [default: 1200] --test Integration test containing fuzz target --timeout Number of seconds to consider a hang when fuzzing or replaying (equivalent to -- -t when fuzzing) diff --git a/cargo-test-fuzz/Cargo.toml b/cargo-test-fuzz/Cargo.toml index 29742014..9425915c 100644 --- a/cargo-test-fuzz/Cargo.toml +++ b/cargo-test-fuzz/Cargo.toml @@ -24,6 +24,8 @@ clap = { version = "4.5", features = ["cargo", "derive", "wrap_help"] } env_logger = "0.11" heck = "0.5" log = "0.4" +mio = { version = "0.8", features = ["os-ext", "os-poll"] } +num_cpus = "1.16" paste = "1.0" remain = "0.2" semver = "1.0" diff --git a/cargo-test-fuzz/src/bin/cargo_test_fuzz/transition.rs b/cargo-test-fuzz/src/bin/cargo_test_fuzz/transition.rs index 8d54c870..a2f77ebb 100644 --- a/cargo-test-fuzz/src/bin/cargo_test_fuzz/transition.rs +++ b/cargo-test-fuzz/src/bin/cargo_test_fuzz/transition.rs @@ -34,6 +34,12 @@ struct TestFuzzWithDeprecations { consolidate: bool, #[arg(long, hide = true)] consolidate_all: bool, + #[arg( + long, + value_name = "N", + help = "Fuzz using at most cpus; default is all but one" + )] + cpus: Option, #[arg( long, value_name = "OBJECT", @@ -107,6 +113,14 @@ struct TestFuzzWithDeprecations { resume: bool, #[arg(long, help = "Stop fuzzing once a crash is found")] run_until_crash: bool, + #[arg( + long, + value_name = "SECONDS", + default_value = "1200", + help = "When there are not sufficiently many cpus to fuzz all targets concurrently, fuzz \ + them in intervals of " + )] + slice: u64, #[arg( long, value_name = "NAME", @@ -136,6 +150,7 @@ impl From for super::TestFuzz { backtrace, consolidate, consolidate_all, + cpus, display, exact, exit_code, @@ -155,6 +170,7 @@ impl From for super::TestFuzz { reset_all, resume, run_until_crash, + slice, test, timeout, verbose, @@ -165,6 +181,7 @@ impl From for super::TestFuzz { backtrace, consolidate, consolidate_all, + cpus, display, exact, exit_code, @@ -184,6 +201,7 @@ impl From for super::TestFuzz { reset_all, resume, run_until_crash, + slice, test, timeout, verbose, diff --git a/cargo-test-fuzz/src/lib.rs b/cargo-test-fuzz/src/lib.rs index 8a653832..586e85d4 100644 --- a/cargo-test-fuzz/src/lib.rs +++ b/cargo-test-fuzz/src/lib.rs @@ -18,6 +18,7 @@ use internal::dirs::{ queue_directory_from_target, target_directory, }; use log::debug; +use mio::{unix::pipe::Receiver, Events, Interest, Poll, Token}; use semver::Version; use semver::VersionReq; use serde::{Deserialize, Serialize}; @@ -28,7 +29,7 @@ use std::{ io::{BufRead, Read}, iter, path::{Path, PathBuf}, - process::{exit, Command}, + process::{exit, Child as StdChild, Command, Stdio}, sync::OnceLock, time::Duration, }; @@ -75,6 +76,7 @@ pub struct TestFuzz { pub backtrace: bool, pub consolidate: bool, pub consolidate_all: bool, + pub cpus: Option, pub display: Option, pub exact: bool, pub exit_code: bool, @@ -94,6 +96,7 @@ pub struct TestFuzz { pub reset_all: bool, pub resume: bool, pub run_until_crash: bool, + pub slice: u64, pub test: Option, pub timeout: Option, pub verbose: bool, @@ -190,27 +193,27 @@ pub fn run(opts: TestFuzz) -> Result<()> { return reset(&opts, &executable_targets); } - let (executable, target) = executable_target(&opts, &executable_targets)?; + if opts.consolidate || opts.reset || display || replay { + let (executable, target) = executable_target(&opts, &executable_targets)?; - if opts.consolidate || opts.reset { - if opts.consolidate { - consolidate(&opts, &executable_targets)?; + if opts.consolidate || opts.reset { + if opts.consolidate { + consolidate(&opts, &executable_targets)?; + } + return reset(&opts, &executable_targets); } - return reset(&opts, &executable_targets); - } - let (flags, dir) = None - .or_else(|| { - opts.display - .map(|object| flags_and_dir(object, &executable.name, &target)) - }) - .or_else(|| { - opts.replay - .map(|object| flags_and_dir(object, &executable.name, &target)) - }) - .unwrap_or((Flags::empty(), PathBuf::default())); + let (flags, dir) = None + .or_else(|| { + opts.display + .map(|object| flags_and_dir(object, &executable.name, &target)) + }) + .or_else(|| { + opts.replay + .map(|object| flags_and_dir(object, &executable.name, &target)) + }) + .unwrap_or((Flags::empty(), PathBuf::default())); - if display || replay { return for_each_entry(&opts, &executable, &target, display, replay, flags, &dir); } @@ -219,7 +222,9 @@ pub fn run(opts: TestFuzz) -> Result<()> { return Ok(()); } - fuzz(&opts, &executable, &target).map_err(|error| { + let executable_targets = flatten_executable_targets(&opts, executable_targets)?; + + fuzz(&opts, &executable_targets).map_err(|error| { if opts.exit_code { eprintln!("{error:?}"); exit(2); @@ -547,7 +552,6 @@ fn check_test_fuzz_and_afl_versions( "cargo-test-fuzz", &cargo_test_fuzz_version, )?; - #[allow(clippy::expect_used)] check_dependency_version( &executable.name, "afl", @@ -881,34 +885,252 @@ fn for_each_entry( Ok(()) } +fn flatten_executable_targets( + opts: &TestFuzz, + executable_targets: Vec<(Executable, Vec)>, +) -> Result> { + let executable_targets = executable_targets + .into_iter() + .flat_map(|(executable, targets)| { + targets + .into_iter() + .map(move |target| (executable.clone(), target)) + }) + .collect::>(); + + ensure!( + !executable_targets.is_empty(), + "Found no fuzz targets{}", + match_message(opts) + ); + + Ok(executable_targets) +} + +struct Config { + ui: bool, + sufficient_cpus: bool, + first_run: bool, +} + +impl Default for Config { + fn default() -> Self { + Self { + ui: true, + sufficient_cpus: true, + first_run: true, + } + } +} + +struct Child { + exec: String, + popen: StdChild, + receiver: Receiver, + unprinted_data: Vec, + time_limit_was_reached: bool, + testing_aborted_programmatically: bool, +} + +impl Child { + fn read_lines(&mut self) -> Result { + loop { + let mut buf = [0; 4096]; + let n = match self.receiver.read(&mut buf) { + Ok(0) => break, + Ok(n) => n, + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => break, + Err(error) => return Err(error.into()), + }; + self.unprinted_data.extend_from_slice(&buf[0..n]); + } + if let Some(i) = self.unprinted_data.iter().rev().position(|&c| c == b'\n') { + let mut buf = self.unprinted_data.split_off(self.unprinted_data.len() - i); + std::mem::swap(&mut self.unprinted_data, &mut buf); + String::from_utf8(buf).map_err(Into::into) + } else { + Ok(String::new()) + } + } +} + #[allow(clippy::too_many_lines)] -fn fuzz(opts: &TestFuzz, executable: &Executable, target: &str) -> Result<()> { - let input_dir = if opts.resume { +fn fuzz(opts: &TestFuzz, executable_targets: &[(Executable, String)]) -> Result<()> { + auto_generate_corpora(executable_targets)?; + + if let (false, [(executable, target)]) = (opts.exit_code, executable_targets) { + let mut command = fuzz_command(opts, &Config::default(), executable, target); + let status = command + .status() + .with_context(|| format!("Could not get status of `{command:?}`"))?; + ensure!(status.success(), "Command failed: {:?}", command); + return Ok(()); + } + + let mut config = Config { + ui: false, + ..Default::default() + }; + + let n_cpus = std::cmp::min(opts.cpus.unwrap_or(num_cpus::get() - 1), num_cpus::get()); + + ensure!(n_cpus >= 1, "Number of cpus must be greater than zero"); + + config.sufficient_cpus = n_cpus >= executable_targets.len(); + + if !config.sufficient_cpus { + ensure!( + opts.max_total_time.is_none(), + "--max-total-time cannot be used when number of cpus ({n_cpus}) is less than number \ + of fuzz targets ({})", + executable_targets.len() + ); + + eprintln!( + "Number of cpus ({n_cpus}) is less than number of fuzz targets ({}); fuzzing each for \ + {} seconds", + executable_targets.len(), + opts.slice + ); + } + + let mut n_children = 0; + let mut i_task = 0; + let mut executable_targets_iter = executable_targets.iter().cycle(); + let mut poll = Poll::new().with_context(|| "`Poll::new` failed")?; + let mut events = Events::with_capacity(128); + let mut children = vec![(); executable_targets.len()] + .into_iter() + .map(|()| None::) + .collect::>(); + let mut i_target_prev = executable_targets.len(); + + loop { + if n_children < n_cpus && (i_task < executable_targets.len() || !config.sufficient_cpus) { + let Some((executable, target)) = executable_targets_iter.next() else { + unreachable!(); + }; + + let i_target = i_task % executable_targets.len(); + + config.first_run = i_task < executable_targets.len(); + + // smoelius: If this is not the target's first run, then there must be insufficient + // cpus. + assert!(config.first_run || !config.sufficient_cpus); + + let mut command = fuzz_command(opts, &config, executable, target); + + let exec = format!("{command:?}"); + command.stdout(Stdio::piped()); + let mut popen = command + .spawn() + .with_context(|| format!("Could not spawn `{exec:?}`"))?; + let stdout = popen + .stdout + .take() + .ok_or_else(|| anyhow!("Could not get output of `{exec:?}`"))?; + let mut receiver = Receiver::from(stdout); + receiver + .set_nonblocking(true) + .with_context(|| "Could not make receiver non-blocking")?; + poll.registry() + .register(&mut receiver, Token(i_target), Interest::READABLE) + .with_context(|| "Could not register receiver")?; + children[i_target] = Some(Child { + exec, + popen, + receiver, + unprinted_data: Vec::new(), + time_limit_was_reached: false, + testing_aborted_programmatically: false, + }); + + n_children += 1; + i_task += 1; + continue; + } + + if n_children == 0 { + assert!(config.sufficient_cpus); + assert!(i_task >= executable_targets.len()); + break; + } + + poll.poll(&mut events, None) + .with_context(|| "`poll` failed")?; + + for event in &events { + let Token(i_target) = event.token(); + let (_, target) = &executable_targets[i_target]; + #[allow(clippy::panic)] + let child = children[i_target] + .as_mut() + .unwrap_or_else(|| panic!("Child for token {i_target} should exist")); + + let s = child.read_lines()?; + for line in s.lines() { + if line.contains("Time limit was reached") { + child.time_limit_was_reached = true; + } + if line.contains("+++ Testing aborted programmatically +++") { + child.testing_aborted_programmatically = true; + } + if i_target_prev < executable_targets.len() && i_target_prev != i_target { + println!("---"); + } + println!("{target}: {line}"); + i_target_prev = i_target; + } + + if event.is_read_closed() { + #[allow(clippy::panic)] + let mut child = children[i_target] + .take() + .unwrap_or_else(|| panic!("Child for token {i_target} should exist")); + poll.registry() + .deregister(&mut child.receiver) + .with_context(|| "Could not deregister receiver")?; + n_children -= 1; + + let status = child + .popen + .wait() + .with_context(|| format!("`wait` failed for `{:?}`", child.popen))?; + + if !child.testing_aborted_programmatically || !status.success() { + bail!("Command failed: {:?}", child.exec); + } + + if opts.exit_code && !child.time_limit_was_reached { + exit(1); + } + } + } + } + + Ok(()) +} + +fn fuzz_command( + opts: &TestFuzz, + config: &Config, + executable: &Executable, + target: &str, +) -> Command { + let input_dir = if opts.resume || !config.first_run { "-".to_owned() } else { - let corpus_dir = corpus_directory_from_target(&executable.name, target); - if !corpus_dir.exists() { - eprintln!( - "Could not find `{}`. Trying to auto-generate it...", - corpus_dir.to_string_lossy(), - ); - auto_generate_corpus(executable, target)?; - ensure!( - corpus_dir.exists(), - "Could not find or auto-generate `{}`. Please ensure `{}` is tested.", - corpus_dir.to_string_lossy(), - target - ); - eprintln!("Auto-generated `{}`.", corpus_dir.to_string_lossy()); - } - corpus_dir.to_string_lossy().into_owned() + corpus_directory_from_target(&executable.name, target) + .to_string_lossy() + .into_owned() }; let output_dir = output_directory_from_target(&executable.name, target); create_dir_all(&output_dir).unwrap_or_default(); let mut envs = BASE_ENVS.to_vec(); - if opts.no_ui { + if !config.ui { envs.push(("AFL_NO_UI", "1")); } if opts.run_until_crash { @@ -931,7 +1153,9 @@ fn fuzz(opts: &TestFuzz, executable: &Executable, target: &str) -> Result<()> { .into_iter() .map(String::from), ); - if let Some(max_total_time) = opts.max_total_time { + if !config.sufficient_cpus { + args.extend(["-V".to_owned(), opts.slice.to_string()]); + } else if let Some(max_total_time) = opts.max_total_time { args.extend(["-V".to_owned(), max_total_time.to_string()]); } if let Some(timeout) = opts.timeout { @@ -949,52 +1173,28 @@ fn fuzz(opts: &TestFuzz, executable: &Executable, target: &str) -> Result<()> { .map(String::from), ); - #[allow(clippy::if_not_else)] - if !opts.exit_code { - let mut command = Command::new("cargo"); - command.envs(envs).args(args); - debug!("{:?}", command); - let status = command - .status() - .with_context(|| format!("Could not get status of `{command:?}`"))?; - - ensure!(status.success(), "Command failed: {:?}", command); - } else { - let exec = Exec::cmd("cargo") - .env_extend(&envs) - .args(&args) - .stdout(Redirection::Pipe); - debug!("{:?}", exec); - let mut popen = exec.clone().popen()?; - let stdout = popen - .stdout - .take() - .ok_or_else(|| anyhow!("Could not get output of `{:?}`", exec))?; - let mut time_limit_was_reached = false; - let mut testing_aborted_programmatically = false; - for line in std::io::BufReader::new(stdout).lines() { - let line = line.with_context(|| format!("Could not get output of `{exec:?}`"))?; - if line.contains("Time limit was reached") { - time_limit_was_reached = true; - } - // smoelius: Work around "pizza mode" bug. - if line.contains("+++ Testing aborted programmatically +++") - || line.contains("+++ Baking aborted programmatically +++") - { - testing_aborted_programmatically = true; - } - println!("{line}"); - } - let status = popen - .wait() - .with_context(|| format!("`wait` failed for `{popen:?}`"))?; - - if !testing_aborted_programmatically || !status.success() { - bail!("Command failed: {:?}", exec); - } + let mut command = Command::new("cargo"); + command.envs(envs).args(&args); + debug!("{:?}", command); + command +} - if !time_limit_was_reached { - exit(1); +fn auto_generate_corpora(executable_targets: &[(Executable, String)]) -> Result<()> { + for (executable, target) in executable_targets { + let corpus_dir = corpus_directory_from_target(&executable.name, target); + if !corpus_dir.exists() { + eprintln!( + "Could not find `{}`. Trying to auto-generate it...", + corpus_dir.to_string_lossy(), + ); + auto_generate_corpus(executable, target)?; + ensure!( + corpus_dir.exists(), + "Could not find or auto-generate `{}`. Please ensure `{}` is tested.", + corpus_dir.to_string_lossy(), + target + ); + eprintln!("Auto-generated `{}`.", corpus_dir.to_string_lossy()); } }