Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fakeminimap2-Rayon-Example-and-Clap #98

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,15 @@ let mappings: Result<Vec<Mapping>> = aligner.map_file("query.fa", false, false);
## Multithreading
Multithreading is supported, for implementation example see [fakeminimap2](https://github.com/jguhlin/minimap2-rs/blob/main/fakeminimap2/src/main.rs). Minimap2 also supports threading itself, and will use a minimum of 3 cores for building the index. Multithreading for mapping is left to the end-user.

Adjust the number of threads used to build the index:
```rust
let mut aligner = Aligner::builder()
.map_ont()
.with_index_threads(8);
```

### Experimental Rayon support
This _appears_ to work.
This _appears_ to work. See [fakeminimap2](https://github.com/jguhlin/minimap2-rs/tree/main/fakeminimap2) for full implementation.

```rust
use rayon::prelude::*;
Expand All @@ -116,6 +117,9 @@ let results = sequences.par_iter().map(|seq| {
}).collect::<Vec<_>>();
```

### Arc cloning the Aligner
Also works. Otherwise directly cloning the aligner will Arc clone the internal index.

## Features
The following crate features are available:
* map-file - Enables the ability to map a file directly to a reference. Enabled by deafult
Expand Down
68 changes: 68 additions & 0 deletions fakeminimap2/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions fakeminimap2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ needletail = "0.6"
humantime = "2.1.0"
log = "0.4.22"
env_logger = "0.11.5"
clap = { version = "4.5.21", features = ["derive"] }
rayon = "1.10.0"

[profile.release]
opt-level = 3
Expand Down
9 changes: 8 additions & 1 deletion fakeminimap2/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
# Multithreaded example of mapping

Using crossbeam and standard library threading, we can multi thread alignments sharing the same references to the index.
Small example files are provided,
Small example files are provided,

## Logging
Logging is done using the log crate, and the log level can be set using the RUST_LOG environment variable.

```
RUST_LOG=info cargo run
```
169 changes: 169 additions & 0 deletions fakeminimap2/src/channels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use crossbeam::queue::ArrayQueue;
use minimap2::*;
use needletail::{parse_fastx_file, FastxReader};

use std::{error::Error, path::Path, sync::Arc, time::Duration};

/// We use a worker queue to pass around work between threads.
/// We do it this way to be generic over the type.
enum WorkQueue<T> {
Work(T),
Result(T),
}

// Not necessary to make types, but it helps keep things straightforward

// We work on distinct WorkUnits (aka a single sequence)
type WorkUnit = (Vec<u8>, Vec<u8>); // Sequence ID, Sequence

// We return the original sequence, and the vector of mappings (results)
type WorkResult = (WorkUnit, Vec<Mapping>);
// We could also choose to return just the Seq ID, and the Mappings should also have the query name too
// You can return whatever would be most useful

pub(crate) fn map_with_channels(
target_file: impl AsRef<Path>,
query_file: impl AsRef<Path>,
threads: usize,
) -> Result<(), Box<dyn Error>> {
// Aligner gets created using the build pattern.
// Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
let aligner = Aligner::builder()
.map_ont()
.with_cigar()
.with_index(target_file, None)
.expect("Unable to build index");

log::info!("Made aligner");
// Create a queue for work and for results
let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(32)); // Artificially low, but the best depends on tuning
let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(32));

// I use a shutdown flag
let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));

// Store join handles, it's just good practice to clean up threads
let mut jh = Vec::new();

let aligner = Arc::new(aligner);

// Spin up the threads
log::info!("Spawn threads");
for _ in 0..threads {
// Clone everything we will need...
let work_queue = Arc::clone(&work_queue);
let results_queue = Arc::clone(&results_queue);
let shutdown = Arc::clone(&shutdown);
let aligner = Arc::clone(&aligner);

let handle = std::thread::spawn(move || {
worker(
work_queue,
results_queue,
shutdown,
aligner,
)
});

jh.push(handle);
}

// Now that the threads are running, read the input file and push the work to the queue
let mut reader: Box<dyn FastxReader> = parse_fastx_file(query_file)
.unwrap_or_else(|_| panic!("Can't find query FASTA file"));

// I just do this in the main thread, but you can split threads
let backoff = crossbeam::utils::Backoff::new();
while let Some(Ok(record)) = reader.next() {
let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
while let Err(work_packet) = work_queue.push(work) {
work = work_packet; // Simple way to maintain ownership
// If we have an error, it's 99% because the queue is full
backoff.snooze();
}
}

// Set the shutdown flag
shutdown.store(true, std::sync::atomic::Ordering::Relaxed);

let mut num_alignments = 0;

let backoff = crossbeam::utils::Backoff::new();
loop {
match results_queue.pop() {
Some(WorkQueue::Result((record, alignments))) => {
num_alignments += alignments.len();
log::info!(
"Got {} alignments for {}",
alignments.len(),
std::str::from_utf8(&record.0).unwrap()
);
}
Some(_) => unimplemented!("Unexpected result type"),
None => {
log::trace!("Awaiting results");
backoff.snooze();

// If all join handles are 'finished' we can break
if jh.iter().all(|h| h.is_finished()) {
break;
}
if backoff.is_completed() {
backoff.reset();
std::thread::sleep(Duration::from_millis(1));
}
}
}
}

println!("Iteration complete, total alignments {}", num_alignments);

// Join all threads
for handle in jh {
match handle.join() {
Ok(_) => log::trace!("Thread finished"),
Err(e) => log::error!("Thread panicked: {:?}", e),
}
}

Ok(())
}

// Convert this to a function
fn worker(
work_queue: Arc<ArrayQueue<WorkQueue<WorkUnit>>>,
results_queue: Arc<ArrayQueue<WorkQueue<WorkResult>>>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
aligner: Arc<Aligner<Built>>,
) {
loop {
// We use backoff to sleep when we don't have any work
let backoff = crossbeam::utils::Backoff::new();

match work_queue.pop() {
Some(WorkQueue::Work(sequence)) => {
let alignment = aligner
.map(&sequence.1, true, false, None, None, Some(&sequence.0))
.expect("Unable to align");

// Return the original sequence, as well as the mappings
// We have to do it this way because ownership
let mut result = WorkQueue::Result((sequence, alignment));
while let Err(result_packet) = results_queue.push(result) {
result = result_packet; // Simple way to maintain ownership
// If we have an error, it's 99% because the queue is full
backoff.snooze();
}
}
Some(_) => unimplemented!("Unexpected work type"),
None => {
backoff.snooze();

// If we have the shutdown signal, we should exit
if shutdown.load(std::sync::atomic::Ordering::Relaxed) && work_queue.is_empty() {
break;
}
}
}
}
}
33 changes: 33 additions & 0 deletions fakeminimap2/src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::path::PathBuf;

use clap::{Parser, ValueEnum};

#[derive(Parser, Debug)]
#[command(name = "fakeminimap2", about = "An example of how to use the minimap2 crate with multithreading")]
pub(crate) struct Cli {
/// The target file to align to (e.g. a reference genome - can be in FASTA, FASTQ, or mmi format)
pub target: PathBuf,

/// The query file to align (e.g. reads - can be FASTA or FASTQ)
pub query: PathBuf,

/// The number of threads to use
pub threads: usize,

/// The method to use for multithreading
pub method: Option<Method>,
}

#[derive(ValueEnum, Debug, Default, Clone)]
pub(crate) enum Method {
#[default]
/// Use crossbeam channels for multithreading (default)
Channels,

/// Use rayon for multithreading
Rayon,
}

pub(crate) fn parse_args() -> Cli {
Cli::parse()
}
Loading