From 3f399efbbe2ecb5e56e271da34d6203d9b7fc04e Mon Sep 17 00:00:00 2001 From: Joseph Guhlin Date: Wed, 27 Nov 2024 12:13:43 +1300 Subject: [PATCH 1/2] Solves #5 --- fakeminimap2/Cargo.lock | 68 ++++++++++++++ fakeminimap2/Cargo.toml | 2 + fakeminimap2/README.md | 9 +- fakeminimap2/src/channels.rs | 169 +++++++++++++++++++++++++++++++++++ fakeminimap2/src/cli.rs | 33 +++++++ fakeminimap2/src/main.rs | 153 +++---------------------------- fakeminimap2/src/rayon.rs | 53 +++++++++++ 7 files changed, 345 insertions(+), 142 deletions(-) create mode 100644 fakeminimap2/src/channels.rs create mode 100644 fakeminimap2/src/cli.rs create mode 100644 fakeminimap2/src/rayon.rs diff --git a/fakeminimap2/Cargo.lock b/fakeminimap2/Cargo.lock index 8c0a5d5..936ebb5 100644 --- a/fakeminimap2/Cargo.lock +++ b/fakeminimap2/Cargo.lock @@ -184,6 +184,46 @@ dependencies = [ "libloading", ] +[[package]] +name = "clap" +version = "4.5.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" + [[package]] name = "cmake" version = "0.1.52" @@ -336,12 +376,14 @@ dependencies = [ name = "fakeminimap2" version = "0.1.0" dependencies = [ + "clap", "crossbeam", "env_logger", "humantime", "log", "minimap2", "needletail", + "rayon", ] [[package]] @@ -760,6 +802,26 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "regex" version = "1.11.1" @@ -876,6 +938,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "strum_macros" version = "0.26.4" diff --git a/fakeminimap2/Cargo.toml b/fakeminimap2/Cargo.toml index 07f289c..f10b4c8 100644 --- a/fakeminimap2/Cargo.toml +++ b/fakeminimap2/Cargo.toml @@ -12,6 +12,8 @@ needletail = "0.6" humantime = "2.1.0" log = "0.4.22" env_logger = "0.11.5" +clap = { version = "4.5.21", features = ["derive"] } +rayon = "1.10.0" [profile.release] opt-level = 3 diff --git a/fakeminimap2/README.md b/fakeminimap2/README.md index 58afc43..a03d6d1 100644 --- a/fakeminimap2/README.md +++ b/fakeminimap2/README.md @@ -1,4 +1,11 @@ # Multithreaded example of mapping Using crossbeam and standard library threading, we can multi thread alignments sharing the same references to the index. -Small example files are provided, \ No newline at end of file +Small example files are provided, + +## Logging +Logging is done using the log crate, and the log level can be set using the RUST_LOG environment variable. + +``` +RUST_LOG=info cargo run +``` \ No newline at end of file diff --git a/fakeminimap2/src/channels.rs b/fakeminimap2/src/channels.rs new file mode 100644 index 0000000..97a3495 --- /dev/null +++ b/fakeminimap2/src/channels.rs @@ -0,0 +1,169 @@ +use crossbeam::queue::ArrayQueue; +use minimap2::*; +use needletail::{parse_fastx_file, FastxReader}; + +use std::{error::Error, path::Path, sync::Arc, time::Duration}; + +/// We use a worker queue to pass around work between threads. +/// We do it this way to be generic over the type. +enum WorkQueue { + Work(T), + Result(T), +} + +// Not necessary to make types, but it helps keep things straightforward + +// We work on distinct WorkUnits (aka a single sequence) +type WorkUnit = (Vec, Vec); // Sequence ID, Sequence + +// We return the original sequence, and the vector of mappings (results) +type WorkResult = (WorkUnit, Vec); +// We could also choose to return just the Seq ID, and the Mappings should also have the query name too +// You can return whatever would be most useful + +pub(crate) fn map_with_channels( + target_file: impl AsRef, + query_file: impl AsRef, + threads: usize, +) -> Result<(), Box> { + // Aligner gets created using the build pattern. + // Once .with_index is called, the aligner is set to "Built" and can no longer be changed. + let aligner = Aligner::builder() + .map_ont() + .with_cigar() + .with_index(target_file, None) + .expect("Unable to build index"); + + log::info!("Made aligner"); + // Create a queue for work and for results + let work_queue = Arc::new(ArrayQueue::>::new(32)); // Artificially low, but the best depends on tuning + let results_queue = Arc::new(ArrayQueue::>::new(32)); + + // I use a shutdown flag + let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + + // Store join handles, it's just good practice to clean up threads + let mut jh = Vec::new(); + + let aligner = Arc::new(aligner); + + // Spin up the threads + log::info!("Spawn threads"); + for _ in 0..threads { + // Clone everything we will need... + let work_queue = Arc::clone(&work_queue); + let results_queue = Arc::clone(&results_queue); + let shutdown = Arc::clone(&shutdown); + let aligner = Arc::clone(&aligner); + + let handle = std::thread::spawn(move || { + worker( + work_queue, + results_queue, + shutdown, + aligner, + ) + }); + + jh.push(handle); + } + + // Now that the threads are running, read the input file and push the work to the queue + let mut reader: Box = parse_fastx_file(query_file) + .unwrap_or_else(|_| panic!("Can't find query FASTA file")); + + // I just do this in the main thread, but you can split threads + let backoff = crossbeam::utils::Backoff::new(); + while let Some(Ok(record)) = reader.next() { + let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec())); + while let Err(work_packet) = work_queue.push(work) { + work = work_packet; // Simple way to maintain ownership + // If we have an error, it's 99% because the queue is full + backoff.snooze(); + } + } + + // Set the shutdown flag + shutdown.store(true, std::sync::atomic::Ordering::Relaxed); + + let mut num_alignments = 0; + + let backoff = crossbeam::utils::Backoff::new(); + loop { + match results_queue.pop() { + Some(WorkQueue::Result((record, alignments))) => { + num_alignments += alignments.len(); + log::info!( + "Got {} alignments for {}", + alignments.len(), + std::str::from_utf8(&record.0).unwrap() + ); + } + Some(_) => unimplemented!("Unexpected result type"), + None => { + log::trace!("Awaiting results"); + backoff.snooze(); + + // If all join handles are 'finished' we can break + if jh.iter().all(|h| h.is_finished()) { + break; + } + if backoff.is_completed() { + backoff.reset(); + std::thread::sleep(Duration::from_millis(1)); + } + } + } + } + + println!("Iteration complete, total alignments {}", num_alignments); + + // Join all threads + for handle in jh { + match handle.join() { + Ok(_) => log::trace!("Thread finished"), + Err(e) => log::error!("Thread panicked: {:?}", e), + } + } + + Ok(()) +} + +// Convert this to a function +fn worker( + work_queue: Arc>>, + results_queue: Arc>>, + shutdown: Arc, + aligner: Arc>, +) { + loop { + // We use backoff to sleep when we don't have any work + let backoff = crossbeam::utils::Backoff::new(); + + match work_queue.pop() { + Some(WorkQueue::Work(sequence)) => { + let alignment = aligner + .map(&sequence.1, true, false, None, None, Some(&sequence.0)) + .expect("Unable to align"); + + // Return the original sequence, as well as the mappings + // We have to do it this way because ownership + let mut result = WorkQueue::Result((sequence, alignment)); + while let Err(result_packet) = results_queue.push(result) { + result = result_packet; // Simple way to maintain ownership + // If we have an error, it's 99% because the queue is full + backoff.snooze(); + } + } + Some(_) => unimplemented!("Unexpected work type"), + None => { + backoff.snooze(); + + // If we have the shutdown signal, we should exit + if shutdown.load(std::sync::atomic::Ordering::Relaxed) && work_queue.is_empty() { + break; + } + } + } + } +} diff --git a/fakeminimap2/src/cli.rs b/fakeminimap2/src/cli.rs new file mode 100644 index 0000000..5a25971 --- /dev/null +++ b/fakeminimap2/src/cli.rs @@ -0,0 +1,33 @@ +use std::path::PathBuf; + +use clap::{Parser, ValueEnum}; + +#[derive(Parser, Debug)] +#[command(name = "fakeminimap2", about = "An example of how to use the minimap2 crate with multithreading")] +pub(crate) struct Cli { + /// The target file to align to (e.g. a reference genome - can be in FASTA, FASTQ, or mmi format) + pub target: PathBuf, + + /// The query file to align (e.g. reads - can be FASTA or FASTQ) + pub query: PathBuf, + + /// The number of threads to use + pub threads: usize, + + /// The method to use for multithreading + pub method: Option, +} + +#[derive(ValueEnum, Debug, Default, Clone)] +pub(crate) enum Method { + #[default] + /// Use crossbeam channels for multithreading (default) + Channels, + + /// Use rayon for multithreading + Rayon, +} + +pub(crate) fn parse_args() -> Cli { + Cli::parse() +} \ No newline at end of file diff --git a/fakeminimap2/src/main.rs b/fakeminimap2/src/main.rs index 679eacb..a6fa919 100644 --- a/fakeminimap2/src/main.rs +++ b/fakeminimap2/src/main.rs @@ -2,154 +2,25 @@ /// Although mpsc is also available in the standard library. /// /// For logging, pass in RUST_LOG=debug or RUST_LOG=trace to see more information. RUST_LOG=info is also supported. -use crossbeam::queue::ArrayQueue; -use minimap2::*; -use needletail::{parse_fastx_file, FastxReader}; -use std::sync::Arc; -use std::time::Duration; +// CLI interface +mod cli; -/// We use a worker queue to pass around work between threads. -/// We do it this way to be generic over the type. -enum WorkQueue { - Work(T), - Result(T), -} +// Multithreading methods +mod channels; // I prefer using channels over rayon, but rayon is simpler to use +mod rayon; fn main() { env_logger::init(); - log::debug!("Making aligner"); - - // Aligner gets created using the build pattern. - // Once .with_index is called, the aligner is set to "Built" and can no longer be changed. - let aligner = Aligner::builder() - .map_ont() - .with_cigar() - .with_index("hg38_chr_M.mmi", None) - .expect("Unable to build index"); - - log::info!("Made aligner"); - - // Not necessary to make types, but it helps keep things straightforward - - // We work on distinct WorkUnits (aka a single sequence) - type WorkUnit = (Vec, Vec); // Sequence ID, Sequence - - // We return the original sequence, and the vector of mappings (results) - type Result = (WorkUnit, Vec); - // We could also choose to return just the Seq ID, and the Mappings should also have the query name too - // You can return whatever would be most useful - - // Create a queue for work and for results - let work_queue = Arc::new(ArrayQueue::>::new(32)); // Artificially low, but the best depends on tuning - let results_queue = Arc::new(ArrayQueue::>::new(32)); - - // I use a shutdown flag - let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); - - // Store join handles, it's just good practice to clean up threads - let mut jh = Vec::new(); - - // Spin up the threads - log::info!("Spawn threads"); - for _ in 0..8 { - // Clone everything we will need... - let work_queue: Arc>> = Arc::clone(&work_queue); - let results_queue = Arc::clone(&results_queue); - let shutdown = Arc::clone(&shutdown); - let aligner = aligner.clone(); - - log::debug!("Cloned aligner"); - let handle = std::thread::spawn(move || loop { - // We use backoff to sleep when we don't have any work - let backoff = crossbeam::utils::Backoff::new(); - - match work_queue.pop() { - Some(WorkQueue::Work(sequence)) => { - let alignment = aligner - .map(&sequence.1, true, false, None, None, Some(&sequence.0)) - .expect("Unable to align"); - - // Return the original sequence, as well as the mappings - // We have to do it this way because ownership - let mut result = WorkQueue::Result((sequence, alignment)); - while let Err(result_packet) = results_queue.push(result) { - result = result_packet; // Simple way to maintain ownership - // If we have an error, it's 99% because the queue is full - backoff.snooze(); - } - } - Some(_) => unimplemented!("Unexpected work type"), - None => { - backoff.snooze(); - // If we have the shutdown signal, we should exit - if shutdown.load(std::sync::atomic::Ordering::Relaxed) && work_queue.is_empty() - { - break; - } - } - } - }); + let args = cli::parse_args(); - jh.push(handle); - } - - // Now that the threads are running, read the input file and push the work to the queue - let mut reader: Box = parse_fastx_file("testing_fake_minimap2_chrM.fasta") - .unwrap_or_else(|_| panic!("Can't find FASTA file at testing_r10_fasta.fasta")); - - // I just do this in the main thread, but you can split threads - let backoff = crossbeam::utils::Backoff::new(); - while let Some(Ok(record)) = reader.next() { - let mut work = WorkQueue::Work(( - record.id().to_vec(), - record.seq().to_vec(), - )); - while let Err(work_packet) = work_queue.push(work) { - work = work_packet; // Simple way to maintain ownership - // If we have an error, it's 99% because the queue is full - backoff.snooze(); + match args.method.unwrap_or_default() { + cli::Method::Channels => { + channels::map_with_channels(args.target, args.query, args.threads).expect("Error mapping with channels"); } - } - - // Set the shutdown flag - shutdown.store(true, std::sync::atomic::Ordering::Relaxed); - - let mut num_alignments = 0; - - let backoff = crossbeam::utils::Backoff::new(); - loop { - match results_queue.pop() { - Some(WorkQueue::Result((record, alignments))) => { - num_alignments += alignments.len(); - log::info!( - "Got {} alignments for {}", - alignments.len(), - std::str::from_utf8(&record.0).unwrap() - ); - } - Some(_) => unimplemented!("Unexpected result type"), - None => { - log::trace!("Awaiting results"); - backoff.snooze(); - - // If all join handles are 'finished' we can break - if jh.iter().all(|h| h.is_finished()) { - break; - } - if backoff.is_completed() { - backoff.reset(); - std::thread::sleep(Duration::from_millis(1)); - } - } + cli::Method::Rayon => { + rayon::map(args.target, args.query, args.threads).expect("Error mapping with rayon"); } } - - println!("Iteration complete, total alignments {}", num_alignments); - - // Join all threads - for handle in jh { - handle.join().unwrap(); - } -} \ No newline at end of file +} diff --git a/fakeminimap2/src/rayon.rs b/fakeminimap2/src/rayon.rs new file mode 100644 index 0000000..e49b10e --- /dev/null +++ b/fakeminimap2/src/rayon.rs @@ -0,0 +1,53 @@ +use std::{error::Error, path::Path}; + +use rayon::prelude::*; +use needletail::{parse_fastx_file, FastxReader}; +use minimap2::*; + + +pub(crate) fn map( + target_file: impl AsRef, + query_file: impl AsRef, + threads: usize, +) -> Result<(), Box> { + // Set the number of threads to use + rayon::ThreadPoolBuilder::new() + .num_threads(threads) + .build_global() + .expect("Unable to set number of threads"); + + // Aligner gets created using the build pattern. + // Once .with_index is called, the aligner is set to "Built" and can no longer be changed. + let aligner = Aligner::builder() + .map_ont() + .with_cigar() + .with_index(target_file, None) + .expect("Unable to build index"); + + log::info!("Made aligner"); + + // Read in the query file + let mut reader = parse_fastx_file(query_file)?; + + let mut queries: Vec<(Vec, Vec)> = Vec::new(); + while let Some(record) = reader.next() { + let record = record.expect("Error reading record"); + queries.push((record.id().to_vec(), record.seq().to_vec())); + } + + // Map the queries + let results: Vec> = queries + .par_iter() + .map(|(id, seq)| { + aligner.map(&seq, false, false, None, None, Some(&id)).expect("Error mapping") + }) + .collect(); + + log::info!("Mapped queries"); + + // Count total number of alignments + let total_alignments: usize = results.iter().map(|x| x.len()).sum(); + println!("Iteration complete, total alignments {}", total_alignments); + + Ok(()) +} \ No newline at end of file From 9615652f39c205312905263bfc1eb08fd1233676 Mon Sep 17 00:00:00 2001 From: Joseph Guhlin Date: Wed, 27 Nov 2024 12:16:30 +1300 Subject: [PATCH 2/2] Update README.md with rayon and multithreading section --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e00bcb0..d88f511 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,7 @@ let mappings: Result> = aligner.map_file("query.fa", false, false); ## Multithreading Multithreading is supported, for implementation example see [fakeminimap2](https://github.com/jguhlin/minimap2-rs/blob/main/fakeminimap2/src/main.rs). Minimap2 also supports threading itself, and will use a minimum of 3 cores for building the index. Multithreading for mapping is left to the end-user. +Adjust the number of threads used to build the index: ```rust let mut aligner = Aligner::builder() .map_ont() @@ -106,7 +107,7 @@ let mut aligner = Aligner::builder() ``` ### Experimental Rayon support -This _appears_ to work. +This _appears_ to work. See [fakeminimap2](https://github.com/jguhlin/minimap2-rs/tree/main/fakeminimap2) for full implementation. ```rust use rayon::prelude::*; @@ -116,6 +117,9 @@ let results = sequences.par_iter().map(|seq| { }).collect::>(); ``` +### Arc cloning the Aligner +Also works. Otherwise directly cloning the aligner will Arc clone the internal index. + ## Features The following crate features are available: * map-file - Enables the ability to map a file directly to a reference. Enabled by deafult