From e6639c128ba9ed78ef685e80f245b0ba67f52691 Mon Sep 17 00:00:00 2001 From: esteinig Date: Sat, 11 Feb 2023 22:54:54 +1100 Subject: [PATCH 1/9] kraken executor working; add draft of parser --- src/cli.rs | 40 ++++-- src/main.rs | 16 ++- src/scrub.rs | 334 +++++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 312 insertions(+), 78 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 3899ea8..649f241 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -31,10 +31,9 @@ pub enum Commands { /// Input filepath(s) (fa, fq, gz, bz). /// /// For paired Illumina you may either pass this flag twice `-i r1.fq -i r2.fq` or give two - /// files consecutively `-i r1.fq r2.fq`. NOTE: Read identifiers for paired-end Illumina reads + /// files consecutively `-i r1.fq r2.fq`. Read identifiers for paired-end Illumina reads /// are assumed to be the same in forward and reverse read files (modern format) without trailing - /// read orientations e.g. `/1` and `/2`. If you are using legacy identifiers, reads in the depleted - /// output may be unpaired. + /// read orientations `/1` or `/2`. #[structopt( short, long, @@ -53,13 +52,36 @@ pub enum Commands { /// Kraken2 database path. /// /// Specify the path to the Kraken2 database directory. - #[structopt(short = "k", long, parse(try_from_os_str = check_file_exists), multiple = false, required = true)] - kraken_db: PathBuf, + #[structopt(short = "k", long, parse(try_from_os_str = check_file_exists), multiple = true, required = true)] + kraken_db: Vec, /// Threads to use for Kraken2 /// - /// Specify the number of threads to pass to Kraken2 - #[structopt(short = "k", long, default_value = "4")] + /// Specify the number of threads to pass to Kraken2. + #[structopt(short = "j", long, default_value = "4")] kraken_threads: u32, + /// Taxa to deplete from reads classified with Kraken2. + /// + /// You may specify multiple taxon names or taxonomic identifiers by passing this flag + /// multiple times `-t Archaea -t 9606` or give taxa consecutively `-t Archaea 9606`. + /// Kraken reports are parsed and every taxonomic level below the provided taxa will + /// be provided for depletion of reads classified at these levels. For example, when + /// providing `Archaea` (Domain) all taxonomic levels below the Domain level are removed + /// until the next level of the same rank or higher is encountere in the report. This means + /// that generally, higher levels than Domain should be specified with `--kraken-taxa-direct`. + /// For example, if the level `cellular organisms` (R2) should be specified with `--kraken-taxa` + /// it may also remove `Viruses` (D) if it is located below the `cellular organisms` level in + /// the report. + #[structopt(short = "t", long, multiple = true, required = false)] + kraken_taxa: Vec, + /// Taxa to deplete directly from reads classified with Kraken2. + /// + /// Additional taxon names or taxonomic identifiers at levels above the domain level provided with + /// `--kraken-taxa` can be specified with this argument. These are directly to the list of taxons + /// and not parsed from the report. For example, to retain Viruses one can specify the domains + /// `-t Archaea -t Bacteria -t Eukaryota` with `--kraken-taxa` and add + /// `-d 'other sequences' -d 'cellular organsisms' -d root` with `--kraken-taxa-direct`. + #[structopt(short = "d", long, multiple = true, required = false)] + kraken_taxa_direct: Vec, /// Working directory containing intermediary files. /// /// Path to a working directory which contains the alignment and intermediary output files @@ -80,7 +102,7 @@ pub enum Commands { hide_possible_values = true )] output_format: Option, - /// Compression level to use if compressing output + /// Compression level to use if compressing output. #[structopt( short = "l", long, @@ -129,7 +151,7 @@ fn check_file_exists(file: &OsStr) -> Result { let path = PathBuf::from(file); let path_msg = format!("{:?} does not exist", path); if path.exists() { - let abs_path = std::fs::canonicalize(path).map_err(|_| OsString::from(path_msg))?; + let abs_path = path.canonicalize().map_err(|_| OsString::from(path_msg))?; Ok(abs_path) } else { Err(OsString::from(path_msg)) diff --git a/src/main.rs b/src/main.rs index f48f6ca..1f6e74f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,5 @@ use anyhow::Result; -use thiserror::Error; use structopt::StructOpt; -use std::path::PathBuf; use chrono::Local; use env_logger::Builder; use log::LevelFilter; @@ -36,19 +34,25 @@ fn main() -> Result<()> { workdir, kraken_db, kraken_threads, + kraken_taxa, + kraken_taxa_direct, output_format, compression_level, } => { let scrubber = scrub::Scrubber::new(workdir)?; - log::info!("Welcome to Scrubby, your trusty read scrubber!"); - scrubber.run_kraken(&input, kraken_db, kraken_threads); - + log::info!("Welcome to Scrubby! You name it, we clean it."); + for db in kraken_db{ + scrubber.run_kraken(&input, db, kraken_threads); + scrubber.deplete_kraken(&input, &kraken_taxa, &kraken_taxa_direct)?; + } + } } - log::info!("Scrub scrub, scrubbity-scrub! Your sequence data, only cleaner!"); + log::info!("Thank you for using Scrubby."); + log::info!("Your sequence data, only cleaner."); Ok(()) } \ No newline at end of file diff --git a/src/scrub.rs b/src/scrub.rs index 689db37..5d823af 100644 --- a/src/scrub.rs +++ b/src/scrub.rs @@ -1,12 +1,11 @@ use anyhow::{Result, Context}; -use std::path::PathBuf; +use std::{path::PathBuf, process::Output, collections::HashSet, io::{BufRead, BufReader}, fs::File}; use needletail::{parse_fastx_file, Sequence}; use std::str::from_utf8; use std::process::Command; use thiserror::Error; use chrono::Local; -use std::fs; -use std::fs::{create_dir_all, canonicalize}; +use std::fs::{create_dir_all}; // See: https://nick.groenen.me/posts/rust-error-handling/ @@ -14,8 +13,11 @@ use std::fs::{create_dir_all, canonicalize}; #[derive(Error, Debug)] pub enum ScrubberError { /// Represents a failure to execute Kraken2 - #[error("execution error - is Kraken2 installed?")] - KrakenError, + #[error("failed to run Kraken2 - is it installed?")] + KrakenExecutionError, + /// Represents a failure to run Kraken2 + #[error("failed to run Kraken2")] + KrakenClassificationError, /// Represents a failure in the correct length of the input file vector #[error("input file error - incorrect number of input files")] FileNumberError, @@ -27,14 +29,17 @@ pub enum ScrubberError { #[error(transparent)] IOError(#[from] std::io::Error), /// Indicates that the working directory already exists - #[error("working directory exists ({0})")] + #[error("working directory exists at {0}")] WorkdirExists(String), /// Indicates a failure to create the working directory - #[error("working directory path could not be created ({0})")] - WorkdirCreateFailure(String), + #[error("working directory path could not be created from {0}")] + WorkdirCreate(String), /// Indicates a failure to obtain an absolute path - #[error("absolute path could not be obtained ({0})")] - AbsolutePathFailure(String) + #[error("absolute path could not be obtained from {0}")] + AbsolutePath(String), + /// Indicates a failure to obtain an absolute path + #[error("database name could not be obtained from {0}")] + DatabaseName(String) } pub struct Scrubber { @@ -49,80 +54,283 @@ impl Scrubber { pub fn run_kraken( &self, input: &Vec, - kraken_db: PathBuf, - kraken_threads: u32, + db: PathBuf, + threads: u32, ) -> Result<(), ScrubberError>{ - // Safely build the arguments for Kraken2 - - let kraken_db_path = kraken_db.into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?; - - let file_args = match input.len() { - 2 => { - let file1 = input[0].clone().into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?; - let file2 = input[1].clone().into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?; - [Some(file1), Some(file2)] - }, - 1 => [Some(input[0].clone().into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?), None], - _ => return Err(ScrubberError::FileNumberError), - }; - - let paired_arg = match input.len() { - 2 => Some("--paired"), - 1 => None, - _ => return Err(ScrubberError::FileNumberError), - }; - - let kraken_threads_arg = kraken_threads.to_string(); - let mut kraken_args = Vec::from(["--threads".to_string(), kraken_threads_arg, "--db".to_string(), kraken_db_path]); - - match paired_arg { - Some(value) => kraken_args.push(value.to_string()), - None => {} + // Kraken2 database name + let db_name = match db.file_name(){ + Some(name) => name.to_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?, + None => return Err(ScrubberError::DatabaseName(format!("{:?}", db))) }; - for file in file_args { - match file { - Some(value) => { - kraken_args.push(value) - } - None => {} - } - }; + // Safely build the arguments for Kraken2 + let kraken_args = get_kraken_command(input, db, threads, &db_name)?; // Run the Kraken command - let output = Command::new("kraken2") - .args(kraken_args) - .current_dir(&self.workdir) - .output() - .map_err(|_| ScrubberError::KrakenError)?; - - println!("status: {}", output.status); - println!("stdout: {}", String::from_utf8_lossy(&output.stdout)); - println!("stderr: {}", String::from_utf8_lossy(&output.stderr)); + .args(kraken_args) + .current_dir(&self.workdir) + .output() + .map_err(|_| ScrubberError::KrakenExecutionError)?; + // Ensure command ran successfully + if output.status.success() { + log::info!("Kraken2 - completed taxonomic assignment ({})", db_name) + } else { + log::info!("Kraken2 - failed to run taxonomic assignment ({})", db_name); + let err_msg = get_kraken_err_msg(output)?; + log::info!("Error from {}", err_msg); + return Err(ScrubberError::KrakenClassificationError) + } + Ok(()) } + pub fn deplete_kraken( + &self, + input: &Vec, + kraken_taxa: &Vec, + kraken_taxa_direct: &Vec + ) -> Result<(), ScrubberError>{ + + log::info!("Kraken2 - depleting reads across selected taxa."); + Ok(()) + } } -/// Checks if work directory exists and otherwise creates it - return the actual path for the application -/// + + +/// Checks if work directory exists and otherwise creates it +/// /// # Errors /// A [`ScrubberError::WorkdirExists`](#clierror) is returned if the directory already exists -/// A [`ScrubberError::WorkdirCreateFailure`](#clierror) is returned if the directory cannot be created +/// A [`ScrubberError::WorkdirCreate`](#clierror) is returned if the directory cannot be created +/// A [`ScrubberError::AbsolutePath`](#clierror) is returned if the directory path cannot be canonicalized pub fn check_or_create_workdir(workdir: Option) -> Result { let _workdir = match workdir { Some(path) => path, - None => PathBuf::from(format!("scrubby_{:}", Local::now().format("%Y%m%dT%H%M%S"))) + None => PathBuf::from(format!("Scrubby-{}", Local::now().format("%Y%m%dT%H%M%S"))) }; - let abs_workdir = canonicalize(&_workdir).map_err(|_| ScrubberError::AbsolutePathFailure(format!("{:?}", _workdir)))?; - let abs_workdir_msg = format!("{:?}", abs_workdir); + let workdir_msg = format!("{:?}", _workdir); if !&_workdir.exists(){ - create_dir_all(&_workdir).map_err(|_| ScrubberError::WorkdirCreateFailure(abs_workdir_msg))?; + create_dir_all(&_workdir).map_err(|_| ScrubberError::WorkdirCreate(workdir_msg))?; + let abs_workdir = _workdir.canonicalize().map_err(|_| ScrubberError::AbsolutePath(format!("{:?}", _workdir)))?; Ok(abs_workdir) } else { - Err(ScrubberError::WorkdirExists(abs_workdir_msg)) + Err(ScrubberError::WorkdirExists(workdir_msg)) + } +} + +/// Builds the Kraken2 command from the input configuration +/// +/// # Errors +/// A [`ScrubberError::InvalidFilePathConversion`](#clierror) is returned if one of the input paths could not be converted to a string +/// A [`ScrubberError::FileNumberError`](#clierror) is returned if for some arcane reason the input file vector is not the correct length +pub fn get_kraken_command(input: &Vec, db: PathBuf, threads: u32, db_name: &str) -> Result, ScrubberError> { + + let kraken_db_path = db.into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?; + let kraken_threads_arg = threads.to_string(); + + let file_arg = match input.len() { + 2 => { + let file1 = input[0].clone().into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?; + let file2 = input[1].clone().into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?; + [Some(file1), Some(file2)] + }, + 1 => [Some(input[0].clone().into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?), None], + _ => return Err(ScrubberError::FileNumberError), + }; + let paired_arg = match input.len() { + 2 => Some("--paired"), + 1 => None, + _ => return Err(ScrubberError::FileNumberError), + }; + + let mut kraken_args = Vec::from([ + "--threads".to_string(), + kraken_threads_arg, + "--db".to_string(), + kraken_db_path, + "--output".to_string(), + format!("{}.kraken", db_name), + "--report".to_string(), + format!("{}.report", db_name) + ]); + + match paired_arg { + Some(value) => kraken_args.push(value.to_string()), + None => {} + }; + for file in file_arg { + match file { + Some(value) => kraken_args.push(value), + None => {} + } + }; + + Ok(kraken_args) +} + +/// Parses the error message from Kraken2 +/// +/// # Errors +/// A [`ScrubberError::KrakenClassificationError`](#clierror) is returned if parsing the error message fails +pub fn get_kraken_err_msg(cmd_output: Output) -> Result{ + let err_out = String::from_utf8_lossy(&cmd_output.stderr); + match err_out.lines().nth(0){ + Some(msg) => Ok(msg.to_string()), + None => return Err(ScrubberError::KrakenClassificationError) + } +} + +enum TaxonomicLevel { + Unclassified, + Root, + Domain, + Phylum, + Class, + Order, + Family, + Genus, + Species +} + + +pub fn parse_kraken_taxids( + // Kraken taxonomic report + kraken_report: PathBuf, + // Kraken read classifications + kraken_reads: PathBuf, + kraken_taxa: Vec, + kraken_taxa_direct: Vec +) -> Result, ScrubberError>{ + + let report = BufReader::new(File::open(kraken_report)?); + + // Make sure no trailign whitespaces are input by user + let kraken_taxa: Vec = kraken_taxa.into_iter().map(|x| x.trim().to_string()).collect(); + let kraken_taxa_direct: Vec = kraken_taxa_direct.into_iter().map(|x| x.trim().to_string()).collect(); + + let mut taxids: HashSet = HashSet::new(); + let mut extract_taxids: bool = false; + + 'report: for line in report.lines() { + // Iterate over the lines in the report file - it is not known which domain comes + // first, so we set a flag when we want to extract identifiers, until the next taxonomic + // level report line of the same kind or above, when the requested domains are checked again + let record: KrakenReportRecord = KrakenReportRecord::from_str(line?)?; + + // Skip all records that do not have reads directly assigned to it! + if record.reads_direct == 0 { + continue 'report; + } + + // Add the direct taxon identifier if the record matches by taxonomic name or identifer + // (and has reads assigned directly) + if kraken_taxa_direct.contains(&record.tax_name) || kraken_taxa_direct.contains(&record.tax_id) { + taxids.insert(record.tax_id); + } + + match record.tax_level.as_str() { + + "D" => { + if domains_to_deplete.contains(&record.tax_name) { + extract_taxids = true; + taxids.insert(record.tax_id); + } else { + extract_taxids = false; + } + }, + &_ => { + if extract_taxids { + taxids.insert(record.tax_id); + } + } + } + } + + + // Extraction of read identifiers extracted from the report or added directly above + let file = BufReader::new(File::open(&path)?); + let mut reads: HashSet = HashSet::new(); + for line in file.lines(){ + let record: KrakenReadRecord = KrakenReadRecord::from_str(line?)?; + if taxids.contains(&record.tax_id){ + reads.insert(record.read_id.clone()); + } + } + + println!("Depleting a total of {:} reads", reads.len()); + + Ok(reads) + +} + +/* +============== +Record Structs +============== +*/ + +/// Kraken read classification record - we are handling taxonomic identifiers +/// as strings in case they are not numeric (e.g. GTDB) +#[derive(Debug, Clone)] +pub struct KrakenReadRecord { + pub classified: bool, + pub read_id: String, + pub tax_id: String, + pub read_len: String, + pub annotation: String, +} + +impl KrakenReadRecord { + pub fn from_str(kraken_line: String) -> Result { + let fields: Vec<&str> = kraken_line.split('\t').collect(); + + let _classified = match fields[0] { + "U" => false, + "C" => true, + _ => false + }; + + let record = Self { + classified: _classified, + read_id: fields[1].trim().to_string(), + tax_id: fields[2].trim().to_string(), + read_len: fields[3].trim().to_string(), + annotation: fields[4].trim().to_string() + }; + + Ok(record) + } +} + +/// Kraken read classification record +#[derive(Debug, Clone)] +pub struct KrakenReportRecord { + pub fraction: String, + pub reads: u64, + pub reads_direct: u64, + pub tax_level: String, + pub tax_id: String, + pub tax_name: String, +} + +impl KrakenReportRecord { + // Create a record from a parsed line + pub fn from_str(report_line: String) -> Result { + let fields: Vec<&str> = report_line.split('\t').collect(); + + let record = Self { + fraction: fields[0].to_string(), + reads: fields[1].parse::()?, + reads_direct: fields[2].parse::()?, + tax_level: fields[3].trim().to_string(), + tax_id: fields[4].trim().to_string(), + tax_name: fields[5].trim().to_string() + }; + + Ok(record) } } \ No newline at end of file From db0fbe6bb564fd26cccecfdd59fddbe11b294000 Mon Sep 17 00:00:00 2001 From: esteinig Date: Tue, 14 Feb 2023 22:12:34 +1100 Subject: [PATCH 2/9] add compression utils; add --extract [#2]; add scrubbing loop; add indexed databases --- src/cli.rs | 32 +++- src/main.rs | 38 ++++- src/scrub.rs | 412 ++++++++++++++++++++++++++++++++++++++++++--------- src/utils.rs | 20 +++ 4 files changed, 420 insertions(+), 82 deletions(-) create mode 100644 src/utils.rs diff --git a/src/cli.rs b/src/cli.rs index 649f241..abc9ac1 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,7 @@ -use std::ffi::OsStr; -use std::{ffi::OsString, path::PathBuf}; +use std::ffi::{OsString, OsStr}; +use std::path::PathBuf; use structopt::StructOpt; +use structopt::clap::AppSettings; use thiserror::Error; /// A collection of custom errors relating to the command line interface for this package. @@ -26,6 +27,7 @@ pub struct Cli { #[derive(Debug, StructOpt)] pub enum Commands { + #[structopt(global_settings = &[AppSettings::ColoredHelp, AppSettings::ArgRequiredElseHelp])] /// Clean seqeunce data by removing background taxa (k-mer) or host reads (alignment) Scrub { /// Input filepath(s) (fa, fq, gz, bz). @@ -47,16 +49,33 @@ pub enum Commands { /// For paired Illumina you may either pass this flag twice `-o r1.fq -o r2.fq` or give two /// files consecutively `-o r1.fq r2.fq`. NOTE: The order of the pairs is assumed to be the /// same as that given for --input. - #[structopt(short, long, parse(from_os_str), multiple = true, required = true)] + #[structopt( + short, + long, + parse(from_os_str), + multiple = true, + required = true + )] output: Vec, - /// Kraken2 database path. + /// Extract reads instead of removing them (--output) /// - /// Specify the path to the Kraken2 database directory. + /// This flagreverses the depletion and makes the command an extraction process + /// of reads that would otherwise be removed during depletion. + #[structopt(short, long)] + extract: bool, + /// Kraken2 database directory path(s). + /// + /// Specify the path to the Kraken2 database directory. This only needs to be specified if you would like to + /// run the Kraken2 analysis; otherwise `--kraken-report` and `--kraken-read` can be used. Note that multiple + /// databases can be specified with `--kraken-db` which will be run in the order in which they are provided. + /// You may either pass this flag twice `-k db1/ -k db2.` or give two files consecutively `-k db1/ db2/`. + /// If multiple databases are provided, their names must be unique. + /// #[structopt(short = "k", long, parse(try_from_os_str = check_file_exists), multiple = true, required = true)] kraken_db: Vec, /// Threads to use for Kraken2 /// - /// Specify the number of threads to pass to Kraken2. + /// Specify the number of threads with which to run Kraken2. #[structopt(short = "j", long, default_value = "4")] kraken_threads: u32, /// Taxa to deplete from reads classified with Kraken2. @@ -114,7 +133,6 @@ pub enum Commands { } } -// Functions may be heavily adapted from Rasusa, due to the excellent error annotation style impl Cli { /// Checks there is a valid and equal number of `--input` and `--output` arguments given. /// diff --git a/src/main.rs b/src/main.rs index 1f6e74f..c80dda0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,26 @@ use anyhow::Result; +use thiserror::Error; use structopt::StructOpt; use chrono::Local; use env_logger::Builder; use log::LevelFilter; use std::io::Write; + mod scrub; mod cli; +mod utils; + + + +#[derive(Error, Debug)] +pub enum ScrubbyError { + /// Represents a failure to convert a PathBuf converted OsStr into a String + #[error("incorrect format of the input database path, are there non-UTF8 characters?")] + InvalidDatabasePath, + /// Indicates a failure to obtain an absolute path + #[error("database name could not be obtained from {0}")] + DatabaseNameExtraction(String), +} fn main() -> Result<()> { let cli = cli::Cli::from_args(); @@ -32,6 +47,7 @@ fn main() -> Result<()> { input, output, workdir, + extract, kraken_db, kraken_threads, kraken_taxa, @@ -40,19 +56,29 @@ fn main() -> Result<()> { compression_level, } => { - let scrubber = scrub::Scrubber::new(workdir)?; + let scrubber = scrub::Scrubber::new(workdir, output_format, compression_level)?; log::info!("Welcome to Scrubby! You name it, we clean it."); - for db in kraken_db{ - scrubber.run_kraken(&input, db, kraken_threads); - scrubber.deplete_kraken(&input, &kraken_taxa, &kraken_taxa_direct)?; + + let mut scrubbed_reads = input; + for (db_index, db_path) in kraken_db.into_iter().enumerate() { + + let db_name = match db_path.file_name(){ + // Kraken2 database name conversion - database path must be UTF8-convertable for manipulation of file paths + Some(name) => name.to_os_string().into_string().map_err(|_| ScrubbyError::InvalidDatabasePath)?, + None => anyhow::bail!(ScrubbyError::DatabaseNameExtraction(format!("{:?}", db_path))) + }; + + let kraken_files = scrubber.run_kraken(&scrubbed_reads, &db_path, &db_name, &db_index, &kraken_threads)?; + // These are either depleted or extracted reads - for depleted reads, we use the depleted reads as input for the next iteration + // but for extracted reads, we do not want to re-extract reads with another database [https://github.com/esteinig/scrubby/issues/2] + scrubbed_reads = scrubber.deplete_kraken(&scrubbed_reads, &db_name, &db_index, &false, &kraken_files, &kraken_taxa, &kraken_taxa_direct)?; } } } - log::info!("Thank you for using Scrubby."); - log::info!("Your sequence data, only cleaner."); + log::info!("Thank you for using Scrubby! Your sequence data, only cleaner."); Ok(()) } \ No newline at end of file diff --git a/src/scrub.rs b/src/scrub.rs index 5d823af..b2b25e7 100644 --- a/src/scrub.rs +++ b/src/scrub.rs @@ -1,15 +1,21 @@ -use anyhow::{Result, Context}; -use std::{path::PathBuf, process::Output, collections::HashSet, io::{BufRead, BufReader}, fs::File}; -use needletail::{parse_fastx_file, Sequence}; + +use std::fmt; +use chrono::Local; +use std::fs::File; +use anyhow::Result; +use serde::Serialize; +use thiserror::Error; +use std::path::PathBuf; use std::str::from_utf8; +use std::process::Output; use std::process::Command; -use thiserror::Error; -use chrono::Local; -use std::fs::{create_dir_all}; +use std::fs::create_dir_all; +use std::collections::HashSet; +use needletail::parse_fastx_file; +use crate::utils::CompressionExt; +use std::io::{BufRead, BufReader, BufWriter}; -// See: https://nick.groenen.me/posts/rust-error-handling/ -/// WordCountError enumerates all possible errors returned by this library. #[derive(Error, Debug)] pub enum ScrubberError { /// Represents a failure to execute Kraken2 @@ -18,11 +24,16 @@ pub enum ScrubberError { /// Represents a failure to run Kraken2 #[error("failed to run Kraken2")] KrakenClassificationError, + /// Represents a failure to convert the read field from string to numeric field in the report file + #[error("failed to convert the read field in the report from Kraken2")] + KrakenReportReadFieldConversion, + /// Represents a failure to convert the read field from string to numeric field in the report file + #[error("failed to convert the direct read field in the report from Kraken2")] + KrakenReportDirectReadFieldConversion, /// Represents a failure in the correct length of the input file vector #[error("input file error - incorrect number of input files")] FileNumberError, /// Represents a failure to convert a PathBuf converted OsStr into a String - /// This is because into_string() returns Result, + compression_level: niffler::compression::Level } impl Scrubber { - pub fn new(workdir: Option) -> Result { + pub fn new(workdir: Option, output_format: Option, compression_level: niffler::compression::Level ) -> Result { let _workdir = check_or_create_workdir(workdir)?; - Ok(Self { workdir: _workdir }) + Ok(Self { workdir: _workdir, output_format, compression_level }) } + /// pub fn run_kraken( &self, input: &Vec, - db: PathBuf, - threads: u32, - ) -> Result<(), ScrubberError>{ + db_path: &PathBuf, + db_name: &String, + db_index: &usize, + threads: &u32, + ) -> Result, ScrubberError>{ - // Kraken2 database name - let db_name = match db.file_name(){ - Some(name) => name.to_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?, - None => return Err(ScrubberError::DatabaseName(format!("{:?}", db))) - }; - // Safely build the arguments for Kraken2 - let kraken_args = get_kraken_command(input, db, threads, &db_name)?; + let kraken_args = get_kraken_command(input, db_path, db_name, db_index, threads)?; // Run the Kraken command let output = Command::new("kraken2") @@ -83,19 +99,57 @@ impl Scrubber { log::info!("Error from {}", err_msg); return Err(ScrubberError::KrakenClassificationError) } + + let kraken_report = self.workdir.join(format!("{}-{}.report", db_index, db_name)); + let kraken_reads = self.workdir.join(format!("{}-{}.kraken", db_index, db_name)); - Ok(()) + Ok(Vec::from([kraken_report, kraken_reads])) } + /// pub fn deplete_kraken( &self, input: &Vec, + db_name: &String, + db_index: &usize, + extract: &bool, + kraken_files: &Vec, kraken_taxa: &Vec, kraken_taxa_direct: &Vec - ) -> Result<(), ScrubberError>{ + ) -> Result, ScrubberError>{ - log::info!("Kraken2 - depleting reads across selected taxa."); + log::info!("Kraken2 - depleting reads across specified taxa..."); + + let reads = parse_kraken_files( + kraken_files[0].clone(), + kraken_files[1].clone(), + kraken_taxa, + kraken_taxa_direct + )?; + + // Temporary files for sequential depletion/extraction in workdir + let output = match input.len() { + 2 => Vec::from([self.workdir.join(format!("{}-{}_1.fq", db_index, db_name)), self.workdir.join(format!("{}-{}_2.fq", db_index, db_name))]), + 1 => Vec::from([self.workdir.join(format!("{}-{}.fq", db_index, db_name))]), + _ => return Err(ScrubberError::FileNumberError) + }; + + let mut read_summary = ReadCountSummary::new(); + + // Enumerated loop is ok since we have checked matching input/output + // vector length in command-line interface and match the file number + for (i, input_file) in input.iter().enumerate() { + + let msg_word = match extract { true => "Extracting", false => "Depleting" }; + log::info!("{} reads {:#?} into {:#?}", msg_word, &input_file, &output[i]); + + // Initiate the depletion operator and deplete/extract the reads identifiers parsed from Kraken + let depletor = ReadDepletion::new(self.output_format, self.compression_level)?; + let read_counts = depletor.deplete(&reads, &input[i], &output[i], extract)?; - Ok(()) + read_summary.reads.push(read_counts); + }; + + Ok(output) } } @@ -127,9 +181,9 @@ pub fn check_or_create_workdir(workdir: Option) -> Result, db: PathBuf, threads: u32, db_name: &str) -> Result, ScrubberError> { +pub fn get_kraken_command(input: &Vec, db_path: &PathBuf, db_name: &str, db_index: &usize, threads: &u32) -> Result, ScrubberError> { - let kraken_db_path = db.into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?; + let kraken_db_path = db_path.to_path_buf().into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?; let kraken_threads_arg = threads.to_string(); let file_arg = match input.len() { @@ -153,9 +207,9 @@ pub fn get_kraken_command(input: &Vec, db: PathBuf, threads: u32, db_na "--db".to_string(), kraken_db_path, "--output".to_string(), - format!("{}.kraken", db_name), + format!("{}-{}.kraken", db_index, db_name), "--report".to_string(), - format!("{}.report", db_name) + format!("{}-{}.report", db_index, db_name) ]); match paired_arg { @@ -183,11 +237,17 @@ pub fn get_kraken_err_msg(cmd_output: Output) -> Result{ None => return Err(ScrubberError::KrakenClassificationError) } } - -enum TaxonomicLevel { +/// Taxonomic level enumeration +/// +/// Provides an integer value for comparison of levels +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] +pub enum TaxonomicLevel { + None, + Unspecified, Unclassified, Root, Domain, + Kingdom, Phylum, Class, Order, @@ -195,65 +255,130 @@ enum TaxonomicLevel { Genus, Species } +impl fmt::Display for TaxonomicLevel { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + TaxonomicLevel::None => write!(f, "None"), + TaxonomicLevel::Unspecified => write!(f, "Unspecified"), + TaxonomicLevel::Unclassified => write!(f, "Unclassified"), + TaxonomicLevel::Root => write!(f, "Root"), + TaxonomicLevel::Domain => write!(f, "Domain"), + TaxonomicLevel::Kingdom => write!(f, "Kingdom"), + TaxonomicLevel::Phylum => write!(f, "Phylum"), + TaxonomicLevel::Class => write!(f, "Class"), + TaxonomicLevel::Order => write!(f, "Order"), + TaxonomicLevel::Family => write!(f, "Family"), + TaxonomicLevel::Genus => write!(f, "Genus"), + TaxonomicLevel::Species => write!(f, "Species"), + } + } +} - -pub fn parse_kraken_taxids( +/// Parse the Kraken output report and read file +/// +/// This functions implements the logic to first parse the record file, and extract any +/// directly specified taxon by name or identifier. It also allows for extraction of any +/// taxon sub-leves of taxa by name or identifier, for example when specifying the +/// domain `Eukaryota` it will parse all sub-levels of `Eukaryota` until the next full +/// domain level is reached (which prevents sub-specifications of domains to be excluded, +/// such as D1, D2, D3) +/// +/// It should be ensured that the underlying database taxonomy does not incorrectly specify +/// sub-levels as the triggering level - for example, when specifying `Eukaryota` the +/// 16S rRNA SILVA database incorrectly specifies `Holozoa` at the domain level, so it +/// should be includedin the taxa to dpeletye to ensure all sequences below `Eukaryota`' +/// and `Holozoa` are excluded. +/// +/// Only taxa with directly assigned reads are included in the to-deplete list. +/// +/// Returns a `HashSet` of read identifiers parsed from the read classification file +/// where the classification must be included in the to-deplete list derived from the report file. +/// +/// # Errors +/// A [`ScrubberError::KrakenReportReadFieldConversion`](#clierror) is returned if the read field in the report file cannot be converted into `u64` +/// A [`ScrubberError::KrakenReportDirectReadFieldConversion`](#clierror) is returned if the direct read field in the report file cannot be converted into `u64` +pub fn parse_kraken_files( // Kraken taxonomic report kraken_report: PathBuf, // Kraken read classifications kraken_reads: PathBuf, - kraken_taxa: Vec, - kraken_taxa_direct: Vec + kraken_taxa: &Vec, + kraken_taxa_direct: &Vec ) -> Result, ScrubberError>{ let report = BufReader::new(File::open(kraken_report)?); - // Make sure no trailign whitespaces are input by user + // Make sure no trailign whitespaces are input by user - these are taxon names or taxon identifiers to deplete let kraken_taxa: Vec = kraken_taxa.into_iter().map(|x| x.trim().to_string()).collect(); let kraken_taxa_direct: Vec = kraken_taxa_direct.into_iter().map(|x| x.trim().to_string()).collect(); let mut taxids: HashSet = HashSet::new(); - let mut extract_taxids: bool = false; + let mut extract_taxlevel: TaxonomicLevel = TaxonomicLevel::None; // make sure this makes sense to initialize 'report: for line in report.lines() { // Iterate over the lines in the report file - it is not known which domain comes // first, so we set a flag when we want to extract identifiers, until the next taxonomic // level report line of the same kind or above, when the requested domains are checked again let record: KrakenReportRecord = KrakenReportRecord::from_str(line?)?; - - // Skip all records that do not have reads directly assigned to it! - if record.reads_direct == 0 { - continue 'report; - } + + let tax_level = get_tax_level(&record); // Add the direct taxon identifier if the record matches by taxonomic name or identifer - // (and has reads assigned directly) + // (and has reads assigned directly) - this is always the case when the direct taxon is found if kraken_taxa_direct.contains(&record.tax_name) || kraken_taxa_direct.contains(&record.tax_id) { - taxids.insert(record.tax_id); + log::info!("Detected direct taxon to deplete ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + taxids.insert(record.tax_id.clone()); } - match record.tax_level.as_str() { + // If taxon level is above Domain - do not allow it to be processed with the block statement below + // this is to prevent failure of the logic implemented to parse the report sub-levels of a given + // taxonomic name or identifier - "D" => { - if domains_to_deplete.contains(&record.tax_name) { - extract_taxids = true; - taxids.insert(record.tax_id); - } else { - extract_taxids = false; - } - }, - &_ => { - if extract_taxids { + if tax_level < TaxonomicLevel::Domain { // Unspecified, Unclassified, Root --> all should be given directly! + log::warn!("Detected taxon below `Domain` level to ignore in sub-level depletion ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + continue 'report; + } + + if kraken_taxa.contains(&record.tax_name) || kraken_taxa.contains(&record.tax_id) { + log::info!("Detected taxon level to deplete ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + // If the current record is in the vector of taxa (and following sub-taxa) to deplete, switch on the extraction flag + // and set the current tax level as a flag for stopping the extraction in subsequent records that are below or equal + // to this tax level + extract_taxlevel = tax_level; + log::info!("Setting taxon level for extraction of sub-levels to {} ({})", extract_taxlevel.to_string(), &record.tax_name); + // Skip all records that do not have reads directly assigned to it! + if record.reads_direct > 0 { + taxids.insert(record.tax_id); + } + } else { + if extract_taxlevel == TaxonomicLevel::None { // guard against no taxa given on initial loop + log::info!("Ignoring record ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + continue 'report; + } + // If the current record is not the depletion list, first check if the taxon level indicates we need to stop - this + // is the case if the tax level is the same or below the extraction flagged tax level set when a record was found to + // start depletion + if (tax_level <= extract_taxlevel) && (record.tax_level.len() == 1) { // guard against premature sub-level reset (e.g. D1, D2, D3) + // Unset the extraction flag and reset the taxonomic level + // to the lowest setting (None - does not ocurr in report) + log::info!("Detected taxon level for sub-level reset ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + extract_taxlevel = TaxonomicLevel::None; + } else { + // Otherwise the taxonomic level is below the one set in the flag and the taxon should be depleted + // Skip all records that do not have reads directly assigned to it! + if record.reads_direct > 0 { + log::info!("Detected taxon sub-level with reads to deplete ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); taxids.insert(record.tax_id); } } + } } - + // HashSet of read identifiers for later depletion + let mut reads: HashSet = HashSet::new(); // Extraction of read identifiers extracted from the report or added directly above - let file = BufReader::new(File::open(&path)?); - let mut reads: HashSet = HashSet::new(); + let file = BufReader::new(File::open(&kraken_reads)?); for line in file.lines(){ let record: KrakenReadRecord = KrakenReadRecord::from_str(line?)?; if taxids.contains(&record.tax_id){ @@ -261,20 +386,37 @@ pub fn parse_kraken_taxids( } } - println!("Depleting a total of {:} reads", reads.len()); + log::info!("Parsed Kraken output files; a total of {} reads will be depleted", reads.len()); Ok(reads) } +/// A utility function to extract a non-specific tax level into an enumeration +fn get_tax_level(record: &KrakenReportRecord) -> TaxonomicLevel { + let tax_level_str = &record.tax_level; + + if tax_level_str.starts_with("U") { return TaxonomicLevel::Unclassified } + else if tax_level_str.starts_with("R") { return TaxonomicLevel::Root } + else if tax_level_str.starts_with("D") { return TaxonomicLevel::Domain } + else if tax_level_str.starts_with("K") { return TaxonomicLevel::Kingdom } + else if tax_level_str.starts_with("P") { return TaxonomicLevel::Phylum } + else if tax_level_str.starts_with("C") { return TaxonomicLevel::Class } + else if tax_level_str.starts_with("O") { return TaxonomicLevel::Order } + else if tax_level_str.starts_with("F") { return TaxonomicLevel::Family } + else if tax_level_str.starts_with("G") { return TaxonomicLevel::Genus } + else if tax_level_str.starts_with("S") { return TaxonomicLevel::Species } + else { return TaxonomicLevel::Unspecified } +} + /* ============== Record Structs ============== */ -/// Kraken read classification record - we are handling taxonomic identifiers -/// as strings in case they are not numeric (e.g. GTDB) +// Kraken read classification record - we are handling taxonomic identifiers +// as strings in case they are not numeric (e.g. GTDB) #[derive(Debug, Clone)] pub struct KrakenReadRecord { pub classified: bool, @@ -306,7 +448,7 @@ impl KrakenReadRecord { } } -/// Kraken read classification record +// Kraken read classification record #[derive(Debug, Clone)] pub struct KrakenReportRecord { pub fraction: String, @@ -324,8 +466,8 @@ impl KrakenReportRecord { let record = Self { fraction: fields[0].to_string(), - reads: fields[1].parse::()?, - reads_direct: fields[2].parse::()?, + reads: fields[1].parse::().map_err(|_| ScrubberError::KrakenReportReadFieldConversion)?, + reads_direct: fields[2].parse::().map_err(|_| ScrubberError::KrakenReportDirectReadFieldConversion)?, tax_level: fields[3].trim().to_string(), tax_id: fields[4].trim().to_string(), tax_name: fields[5].trim().to_string() @@ -333,4 +475,136 @@ impl KrakenReportRecord { Ok(record) } -} \ No newline at end of file +} + +/* +========================= +Read depletion/extraction +========================= +*/ + +// A summary struct to hold counts +// for the read depletion/extraction +#[derive(Serialize)] +pub struct ReadCounts { + pub total: u64, + pub depleted: u64, + pub extracted: u64, + pub retained: u64, + pub input_file: PathBuf, + pub output_file: PathBuf, +} + +// A summary struct to hold settings +// for depletion/extraction +#[derive(Serialize)] +pub struct DepletionSettings { +} + +// Struct to hold the read depletion for +// each file to be output to JSON +#[derive(Serialize)] +pub struct ReadCountSummary { + pub reads: Vec, + pub settings: DepletionSettings, +} + +impl ReadCountSummary { + pub fn new() -> Self { + Self { + reads: Vec::new(), + settings: DepletionSettings {}, + } + } +} + +/// Read depletion sub-struct +/// +/// This struct can be initialised with `niffler ::compression::Format` which +/// optionally specifies the output format and a `niffler::compression::Level` +/// which specified the output compression level. +pub struct ReadDepletion { + output_format: Option, + compression_level: niffler::compression::Level +} + +impl ReadDepletion { + pub fn new( + output_format: Option, + compression_level: niffler::compression::Level + ) -> Result { + + Ok(Self { output_format, compression_level }) + } + /// Deplete reads from an input read file + /// + /// This method depletes (`extract = false`) or extracts (`extract = true`) an input + /// read file which may be compressed. It checks if the read identifier is contained + /// within the `HashSet`of read identifiers provided. It also counts the total, + /// depleted or extracted, and the retained reads and returns a `ReadCounts` object + /// which can be added to the `ReadCountSummary` to be output to JSON. + /// + /// /// # Errors + /// A [`ScrubberError::DepletionFastxParser`](#clierror) is returned if the input file cannot be read, or if the output file cannot be written to + /// A [`ScrubberError::DepletionCompressionWriter`](#clierror) is returned if the compression writer cannot be obtained + /// A [`ScrubberError::DepletionRecordIdentifier`](#clierror) if the read identifier cannot be converted to valid UTF8 + /// + pub fn deplete( + &self, + reads: &HashSet, + input: &PathBuf, + output: &PathBuf, + extract: &bool, + ) -> Result { + + // Input output of read files includes compression detection + let mut reader = parse_fastx_file(input).map_err(|err| ScrubberError::DepletionFastxParser(err))?; + + let file = File::create(&output)?; + let file_handle = BufWriter::new(file); + let fmt = match self.output_format { + None => niffler::Format::from_path(&output), + Some(format) => format, + }; + + let mut writer_retained = niffler::get_writer(Box::new(file_handle), fmt, self.compression_level).map_err(|err| ScrubberError::DepletionCompressionWriter(err))?; + + + let mut read_counts = ReadCounts { + total: 0, + depleted: 0, + extracted: 0, + retained: 0, + input_file: input.to_path_buf(), + output_file: output.to_path_buf(), + }; + + // Note that if Kraken paired data (--paired in Kraken2) legacy Illumina read identifier formats + // with trailing /1 and /2 are stripped of their trails and the reads output does not contain + // the trails. This mismatches with the input reads (which have /1 and /2) which are therefore + // not depleted. + + + while let Some(record) = reader.next() { + let rec = record.map_err(|err| ScrubberError::DepletionFastxParser(err))?; + let rec_id = from_utf8(rec.id()).map_err(|err| ScrubberError::DepletionRecordIdentifier(err))?.split(' ').next().unwrap_or(""); // needletail parses the entire header as identifier (including description) + + let to_retain: bool = match extract { + true => reads.contains(&rec_id.to_string()), + false => !reads.contains(&rec_id.to_string()) + }; + + if to_retain { + rec.write(&mut writer_retained, None).map_err(|err| ScrubberError::DepletionFastxParser(err))?; + read_counts.retained += 1 + } else { + match extract { + true => read_counts.extracted += 1, // if extraction flag, count extracted, otherwise ... + false => read_counts.depleted += 1 // count depleted + } + } + read_counts.total += 1 + } + Ok(read_counts) + } +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..c06c5db --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,20 @@ +use std::ffi::OsStr; +use std::path::Path; + +pub trait CompressionExt { + fn from_path + ?Sized>(p: &S) -> Self; +} + +/// Attempts to infer the compression type from the file extension. +/// If the extension is not known, then Uncompressed is returned. +impl CompressionExt for niffler::compression::Format { + fn from_path + ?Sized>(p: &S) -> Self { + let path = Path::new(p); + match path.extension().map(|s| s.to_str()) { + Some(Some("gz")) => Self::Gzip, + Some(Some("bz") | Some("bz2")) => Self::Bzip, + Some(Some("lzma")) => Self::Lzma, + _ => Self::No, + } + } +} \ No newline at end of file From 697cc91ff90e1c0017b22d4228b8883fc5b1e44f Mon Sep 17 00:00:00 2001 From: esteinig Date: Tue, 14 Feb 2023 22:20:13 +1100 Subject: [PATCH 3/9] clean db name get func --- src/main.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index c80dda0..d1cae38 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use structopt::StructOpt; use chrono::Local; use env_logger::Builder; use log::LevelFilter; -use std::io::Write; +use std::{io::Write, ffi::OsString, path::PathBuf}; mod scrub; mod cli; @@ -63,11 +63,7 @@ fn main() -> Result<()> { let mut scrubbed_reads = input; for (db_index, db_path) in kraken_db.into_iter().enumerate() { - let db_name = match db_path.file_name(){ - // Kraken2 database name conversion - database path must be UTF8-convertable for manipulation of file paths - Some(name) => name.to_os_string().into_string().map_err(|_| ScrubbyError::InvalidDatabasePath)?, - None => anyhow::bail!(ScrubbyError::DatabaseNameExtraction(format!("{:?}", db_path))) - }; + let db_name = get_db_name(&db_path)?; let kraken_files = scrubber.run_kraken(&scrubbed_reads, &db_path, &db_name, &db_index, &kraken_threads)?; // These are either depleted or extracted reads - for depleted reads, we use the depleted reads as input for the next iteration @@ -81,4 +77,12 @@ fn main() -> Result<()> { log::info!("Thank you for using Scrubby! Your sequence data, only cleaner."); Ok(()) +} + +fn get_db_name(db_path: &PathBuf) -> Result { + match db_path.file_name(){ + // Kraken2 database name conversion - database path must be UTF8-convertable for manipulation of file paths + Some(name) => Ok(name.to_os_string().into_string().map_err(|_| ScrubbyError::InvalidDatabasePath)?), + None => return Err(ScrubbyError::DatabaseNameExtraction(format!("{:?}", db_path))) + } } \ No newline at end of file From 18892ba569e0a5ae2bb5986d35abcd50791392ee Mon Sep 17 00:00:00 2001 From: esteinig Date: Tue, 14 Feb 2023 22:20:54 +1100 Subject: [PATCH 4/9] clean db name get func --- src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index d1cae38..c9cf687 100644 --- a/src/main.rs +++ b/src/main.rs @@ -64,8 +64,8 @@ fn main() -> Result<()> { for (db_index, db_path) in kraken_db.into_iter().enumerate() { let db_name = get_db_name(&db_path)?; - let kraken_files = scrubber.run_kraken(&scrubbed_reads, &db_path, &db_name, &db_index, &kraken_threads)?; + // These are either depleted or extracted reads - for depleted reads, we use the depleted reads as input for the next iteration // but for extracted reads, we do not want to re-extract reads with another database [https://github.com/esteinig/scrubby/issues/2] scrubbed_reads = scrubber.deplete_kraken(&scrubbed_reads, &db_name, &db_index, &false, &kraken_files, &kraken_taxa, &kraken_taxa_direct)?; @@ -79,9 +79,9 @@ fn main() -> Result<()> { Ok(()) } +// Utility function to extract the database name as valid UTF-8 fn get_db_name(db_path: &PathBuf) -> Result { match db_path.file_name(){ - // Kraken2 database name conversion - database path must be UTF8-convertable for manipulation of file paths Some(name) => Ok(name.to_os_string().into_string().map_err(|_| ScrubbyError::InvalidDatabasePath)?), None => return Err(ScrubbyError::DatabaseNameExtraction(format!("{:?}", db_path))) } From 1bbd1efb90ccdfe66a69ba4286a77c746a287e93 Mon Sep 17 00:00:00 2001 From: esteinig Date: Thu, 16 Feb 2023 08:33:36 +1100 Subject: [PATCH 5/9] add log colors --- src/main.rs | 47 +++++++++++++++++++++++++++++++++++++---------- src/scrub.rs | 25 ++++++++++++++----------- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/src/main.rs b/src/main.rs index c9cf687..89ea6ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,8 +3,9 @@ use thiserror::Error; use structopt::StructOpt; use chrono::Local; use env_logger::Builder; -use log::LevelFilter; -use std::{io::Write, ffi::OsString, path::PathBuf}; +use log::{LevelFilter, Level}; +use env_logger::fmt::Color; +use std::{io::Write, path::PathBuf}; mod scrub; mod cli; @@ -31,11 +32,36 @@ fn main() -> Result<()> { Builder::new() .format(|buf, record| { - writeln!(buf, + + let mut red_style = buf.style(); + red_style.set_color(Color::Red).set_bold(true); + + let mut green_style = buf.style(); + green_style.set_color(Color::Green).set_bold(true); + let mut green_style_text = buf.style(); + green_style_text.set_color(Color::Green).set_bold(false); + + let mut white_style = buf.style(); + white_style.set_color(Color::White).set_bold(true); + let mut orange_style = buf.style(); + orange_style.set_color(Color::Rgb(255, 102, 0)).set_bold(true); + + + + let timestamp = buf.timestamp(); + + let msg = match record.level(){ + Level::Warn => (orange_style.value(record.level()), orange_style.value(record.args())), + Level::Info => (green_style.value(record.level()), green_style_text.value(record.args())), + _ => (white_style.value(record.level()), white_style.value(record.args())) + }; + + writeln!( + buf, "{} [{}] - {}", - Local::now().format("%Y-%m-%dT%H:%M:%S"), - record.level(), - record.args() + white_style.value(timestamp), + msg.0, + msg.1 ) }) .filter(None, LevelFilter::Info) @@ -60,16 +86,17 @@ fn main() -> Result<()> { log::info!("Welcome to Scrubby! You name it, we clean it."); - let mut scrubbed_reads = input; + let mut read_files = input; for (db_index, db_path) in kraken_db.into_iter().enumerate() { let db_name = get_db_name(&db_path)?; - let kraken_files = scrubber.run_kraken(&scrubbed_reads, &db_path, &db_name, &db_index, &kraken_threads)?; - + let kraken_files = scrubber.run_kraken(&read_files, &db_path, &db_name, &db_index, &kraken_threads)?; // These are either depleted or extracted reads - for depleted reads, we use the depleted reads as input for the next iteration // but for extracted reads, we do not want to re-extract reads with another database [https://github.com/esteinig/scrubby/issues/2] - scrubbed_reads = scrubber.deplete_kraken(&scrubbed_reads, &db_name, &db_index, &false, &kraken_files, &kraken_taxa, &kraken_taxa_direct)?; + read_files = scrubber.deplete_kraken(&read_files, &db_name, &db_index, &false, &kraken_files, &kraken_taxa, &kraken_taxa_direct)?; } + + } } diff --git a/src/scrub.rs b/src/scrub.rs index b2b25e7..75c0842 100644 --- a/src/scrub.rs +++ b/src/scrub.rs @@ -83,6 +83,9 @@ impl Scrubber { // Safely build the arguments for Kraken2 let kraken_args = get_kraken_command(input, db_path, db_name, db_index, threads)?; + log::info!("Running Kraken2 with database: {}", db_name); + log::debug!("Executing Kraken2 command: {}", &kraken_args.join(" ")); + // Run the Kraken command let output = Command::new("kraken2") .args(kraken_args) @@ -92,11 +95,11 @@ impl Scrubber { // Ensure command ran successfully if output.status.success() { - log::info!("Kraken2 - completed taxonomic assignment ({})", db_name) + log::info!("Completed taxonomic assignment with Kraken2 ({})", db_name) } else { - log::info!("Kraken2 - failed to run taxonomic assignment ({})", db_name); + log::error!("Failed to run taxonomic assignment with Kraken2 ({})", db_name); let err_msg = get_kraken_err_msg(output)?; - log::info!("Error from {}", err_msg); + log::error!("Error from {}", err_msg); return Err(ScrubberError::KrakenClassificationError) } @@ -117,7 +120,8 @@ impl Scrubber { kraken_taxa_direct: &Vec ) -> Result, ScrubberError>{ - log::info!("Kraken2 - depleting reads across specified taxa..."); + let msg_word = match extract { true => "Extracting", false => "Depleting" }; + log::info!("{} reads of classified taxa from Kraken2", &msg_word); let reads = parse_kraken_files( kraken_files[0].clone(), @@ -139,8 +143,7 @@ impl Scrubber { // vector length in command-line interface and match the file number for (i, input_file) in input.iter().enumerate() { - let msg_word = match extract { true => "Extracting", false => "Depleting" }; - log::info!("{} reads {:#?} into {:#?}", msg_word, &input_file, &output[i]); + log::info!("{} reads {:#?} into {:#?}", &msg_word, &input_file, &output[i]); // Initiate the depletion operator and deplete/extract the reads identifiers parsed from Kraken let depletor = ReadDepletion::new(self.output_format, self.compression_level)?; @@ -335,7 +338,7 @@ pub fn parse_kraken_files( // taxonomic name or identifier if tax_level < TaxonomicLevel::Domain { // Unspecified, Unclassified, Root --> all should be given directly! - log::warn!("Detected taxon below `Domain` level to ignore in sub-level depletion ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::warn!("Detected taxon below level `Domain`; ignored in sub-level depletion ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); continue 'report; } @@ -345,14 +348,14 @@ pub fn parse_kraken_files( // and set the current tax level as a flag for stopping the extraction in subsequent records that are below or equal // to this tax level extract_taxlevel = tax_level; - log::info!("Setting taxon level for extraction of sub-levels to {} ({})", extract_taxlevel.to_string(), &record.tax_name); + log::debug!("Setting taxon level for extraction of sub-levels to {} ({})", extract_taxlevel.to_string(), &record.tax_name); // Skip all records that do not have reads directly assigned to it! if record.reads_direct > 0 { taxids.insert(record.tax_id); } } else { if extract_taxlevel == TaxonomicLevel::None { // guard against no taxa given on initial loop - log::info!("Ignoring record ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::debug!("Ignoring record ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); continue 'report; } // If the current record is not the depletion list, first check if the taxon level indicates we need to stop - this @@ -361,13 +364,13 @@ pub fn parse_kraken_files( if (tax_level <= extract_taxlevel) && (record.tax_level.len() == 1) { // guard against premature sub-level reset (e.g. D1, D2, D3) // Unset the extraction flag and reset the taxonomic level // to the lowest setting (None - does not ocurr in report) - log::info!("Detected taxon level for sub-level reset ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::debug!("Detected taxon level for sub-level reset ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); extract_taxlevel = TaxonomicLevel::None; } else { // Otherwise the taxonomic level is below the one set in the flag and the taxon should be depleted // Skip all records that do not have reads directly assigned to it! if record.reads_direct > 0 { - log::info!("Detected taxon sub-level with reads to deplete ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::debug!("Detected taxon sub-level with reads to deplete ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); taxids.insert(record.tax_id); } } From 9e146894dc58cf6d80657885262aebac40156d1d Mon Sep 17 00:00:00 2001 From: esteinig Date: Thu, 16 Feb 2023 08:40:23 +1100 Subject: [PATCH 6/9] add log colors --- src/main.rs | 14 +++++--------- src/scrub.rs | 10 +++++----- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/src/main.rs b/src/main.rs index 89ea6ef..2e9005c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,27 +32,23 @@ fn main() -> Result<()> { Builder::new() .format(|buf, record| { - let mut red_style = buf.style(); red_style.set_color(Color::Red).set_bold(true); - let mut green_style = buf.style(); green_style.set_color(Color::Green).set_bold(true); - let mut green_style_text = buf.style(); - green_style_text.set_color(Color::Green).set_bold(false); - let mut white_style = buf.style(); - white_style.set_color(Color::White).set_bold(true); + white_style.set_color(Color::White).set_bold(false); let mut orange_style = buf.style(); orange_style.set_color(Color::Rgb(255, 102, 0)).set_bold(true); - - + let mut apricot_style = buf.style(); + apricot_style.set_color(Color::Rgb(255, 195, 0)).set_bold(true); let timestamp = buf.timestamp(); let msg = match record.level(){ Level::Warn => (orange_style.value(record.level()), orange_style.value(record.args())), - Level::Info => (green_style.value(record.level()), green_style_text.value(record.args())), + Level::Info => (green_style.value(record.level()), white_style.value(record.args())), + Level::Debug => (apricot_style.value(record.level()), apricot_style.value(record.args())), _ => (white_style.value(record.level()), white_style.value(record.args())) }; diff --git a/src/scrub.rs b/src/scrub.rs index 75c0842..5de0bdf 100644 --- a/src/scrub.rs +++ b/src/scrub.rs @@ -83,7 +83,7 @@ impl Scrubber { // Safely build the arguments for Kraken2 let kraken_args = get_kraken_command(input, db_path, db_name, db_index, threads)?; - log::info!("Running Kraken2 with database: {}", db_name); + log::info!("Executing taxonomic classification with Kraken2 ({})", db_name); log::debug!("Executing Kraken2 command: {}", &kraken_args.join(" ")); // Run the Kraken command @@ -95,9 +95,9 @@ impl Scrubber { // Ensure command ran successfully if output.status.success() { - log::info!("Completed taxonomic assignment with Kraken2 ({})", db_name) + log::info!("Completed taxonomic classification with Kraken2 ({})", db_name) } else { - log::error!("Failed to run taxonomic assignment with Kraken2 ({})", db_name); + log::error!("Failed to run taxonomic classification with Kraken2 ({})", db_name); let err_msg = get_kraken_err_msg(output)?; log::error!("Error from {}", err_msg); return Err(ScrubberError::KrakenClassificationError) @@ -121,7 +121,7 @@ impl Scrubber { ) -> Result, ScrubberError>{ let msg_word = match extract { true => "Extracting", false => "Depleting" }; - log::info!("{} reads of classified taxa from Kraken2", &msg_word); + log::info!("{} classified reads from Kraken2", &msg_word); let reads = parse_kraken_files( kraken_files[0].clone(), @@ -338,7 +338,7 @@ pub fn parse_kraken_files( // taxonomic name or identifier if tax_level < TaxonomicLevel::Domain { // Unspecified, Unclassified, Root --> all should be given directly! - log::warn!("Detected taxon below level `Domain`; ignored in sub-level depletion ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::warn!("Detected taxon level below `Domain` - ignored in sublevel depletion ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); continue 'report; } From a9ff78247218b44644a3751748c7323dfbe7c412 Mon Sep 17 00:00:00 2001 From: esteinig Date: Thu, 16 Feb 2023 08:43:05 +1100 Subject: [PATCH 7/9] fix err doc links --- src/scrub.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/scrub.rs b/src/scrub.rs index 5de0bdf..713c2bd 100644 --- a/src/scrub.rs +++ b/src/scrub.rs @@ -161,9 +161,9 @@ impl Scrubber { /// Checks if work directory exists and otherwise creates it /// /// # Errors -/// A [`ScrubberError::WorkdirExists`](#clierror) is returned if the directory already exists -/// A [`ScrubberError::WorkdirCreate`](#clierror) is returned if the directory cannot be created -/// A [`ScrubberError::AbsolutePath`](#clierror) is returned if the directory path cannot be canonicalized +/// A [`ScrubberError::WorkdirExists`](#scrubbererror) is returned if the directory already exists +/// A [`ScrubberError::WorkdirCreate`](#scrubbererror) is returned if the directory cannot be created +/// A [`ScrubberError::AbsolutePath`](#scrubbererror) is returned if the directory path cannot be canonicalized pub fn check_or_create_workdir(workdir: Option) -> Result { let _workdir = match workdir { Some(path) => path, @@ -182,8 +182,8 @@ pub fn check_or_create_workdir(workdir: Option) -> Result, db_path: &PathBuf, db_name: &str, db_index: &usize, threads: &u32) -> Result, ScrubberError> { let kraken_db_path = db_path.to_path_buf().into_os_string().into_string().map_err(|_| ScrubberError::InvalidFilePathConversion)?; @@ -232,7 +232,7 @@ pub fn get_kraken_command(input: &Vec, db_path: &PathBuf, db_name: &str /// Parses the error message from Kraken2 /// /// # Errors -/// A [`ScrubberError::KrakenClassificationError`](#clierror) is returned if parsing the error message fails +/// A [`ScrubberError::KrakenClassificationError`](#scrubbererror) is returned if parsing the error message fails pub fn get_kraken_err_msg(cmd_output: Output) -> Result{ let err_out = String::from_utf8_lossy(&cmd_output.stderr); match err_out.lines().nth(0){ @@ -298,8 +298,8 @@ impl fmt::Display for TaxonomicLevel { /// where the classification must be included in the to-deplete list derived from the report file. /// /// # Errors -/// A [`ScrubberError::KrakenReportReadFieldConversion`](#clierror) is returned if the read field in the report file cannot be converted into `u64` -/// A [`ScrubberError::KrakenReportDirectReadFieldConversion`](#clierror) is returned if the direct read field in the report file cannot be converted into `u64` +/// A [`ScrubberError::KrakenReportReadFieldConversion`](#scrubbererror) is returned if the read field in the report file cannot be converted into `u64` +/// A [`ScrubberError::KrakenReportDirectReadFieldConversion`](#scrubbererror) is returned if the direct read field in the report file cannot be converted into `u64` pub fn parse_kraken_files( // Kraken taxonomic report kraken_report: PathBuf, @@ -521,7 +521,7 @@ impl ReadCountSummary { } } -/// Read depletion sub-struct +/// Read depletion struct /// /// This struct can be initialised with `niffler ::compression::Format` which /// optionally specifies the output format and a `niffler::compression::Level` @@ -548,9 +548,9 @@ impl ReadDepletion { /// which can be added to the `ReadCountSummary` to be output to JSON. /// /// /// # Errors - /// A [`ScrubberError::DepletionFastxParser`](#clierror) is returned if the input file cannot be read, or if the output file cannot be written to - /// A [`ScrubberError::DepletionCompressionWriter`](#clierror) is returned if the compression writer cannot be obtained - /// A [`ScrubberError::DepletionRecordIdentifier`](#clierror) if the read identifier cannot be converted to valid UTF8 + /// A [`ScrubberError::DepletionFastxParser`](#scrubbererror) is returned if the input file cannot be read, or if the output file cannot be written to + /// A [`ScrubberError::DepletionCompressionWriter`](#scrubbererror) is returned if the compression writer cannot be obtained + /// A [`ScrubberError::DepletionRecordIdentifier`](#scrubbererror) if the read identifier cannot be converted to valid UTF8 /// pub fn deplete( &self, @@ -585,8 +585,7 @@ impl ReadDepletion { // Note that if Kraken paired data (--paired in Kraken2) legacy Illumina read identifier formats // with trailing /1 and /2 are stripped of their trails and the reads output does not contain // the trails. This mismatches with the input reads (which have /1 and /2) which are therefore - // not depleted. - + // not depleted. We do not expect legacy format Illumina reads. while let Some(record) = reader.next() { let rec = record.map_err(|err| ScrubberError::DepletionFastxParser(err))?; From 89a43f0826b615d3440220d464cc2880e2833d0f Mon Sep 17 00:00:00 2001 From: esteinig Date: Fri, 17 Feb 2023 16:13:41 +1100 Subject: [PATCH 8/9] add logger for sub-level extraction --- src/cli.rs | 21 ++++-- src/main.rs | 14 ++-- src/scrub.rs | 187 ++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 166 insertions(+), 56 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index abc9ac1..9bec2ec 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -37,7 +37,7 @@ pub enum Commands { /// are assumed to be the same in forward and reverse read files (modern format) without trailing /// read orientations `/1` or `/2`. #[structopt( - short, + short = "i", long, parse(try_from_os_str = check_file_exists), multiple = true, @@ -50,7 +50,7 @@ pub enum Commands { /// files consecutively `-o r1.fq r2.fq`. NOTE: The order of the pairs is assumed to be the /// same as that given for --input. #[structopt( - short, + short = "o", long, parse(from_os_str), multiple = true, @@ -59,9 +59,9 @@ pub enum Commands { output: Vec, /// Extract reads instead of removing them (--output) /// - /// This flagreverses the depletion and makes the command an extraction process + /// This flag reverses the depletion and makes the command an extraction process /// of reads that would otherwise be removed during depletion. - #[structopt(short, long)] + #[structopt(short = "e", long)] extract: bool, /// Kraken2 database directory path(s). /// @@ -104,9 +104,16 @@ pub enum Commands { /// Working directory containing intermediary files. /// /// Path to a working directory which contains the alignment and intermediary output files - /// from the programs called during scrubbing. - #[structopt(short, long, parse(from_os_str))] + /// from the programs called during scrubbing. By default is the working output directory + /// is named with a timestamp e.g. `Scrubby_{YYYYMMDDTHHMMSS}` + #[structopt(short = "w", long, parse(from_os_str))] workdir: Option, + /// Keep the working directory and intermediate files + /// + /// This flag specifies that we want to keep the working directory and all intermediate files + /// otherwise the working directory is deleted + #[structopt(short = "K", long)] + keep: bool, /// u: uncompressed; b: Bzip2; g: Gzip; l: Lzma /// /// Default is to attempt to infer the output compression format automatically from the filename @@ -123,7 +130,7 @@ pub enum Commands { output_format: Option, /// Compression level to use if compressing output. #[structopt( - short = "l", + short = "L", long, parse(try_from_str = parse_level), default_value="6", diff --git a/src/main.rs b/src/main.rs index 2e9005c..15522ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use anyhow::Result; use thiserror::Error; use structopt::StructOpt; -use chrono::Local; use env_logger::Builder; use log::{LevelFilter, Level}; use env_logger::fmt::Color; @@ -32,6 +31,8 @@ fn main() -> Result<()> { Builder::new() .format(|buf, record| { + let timestamp = buf.timestamp(); + let mut red_style = buf.style(); red_style.set_color(Color::Red).set_bold(true); let mut green_style = buf.style(); @@ -43,12 +44,11 @@ fn main() -> Result<()> { let mut apricot_style = buf.style(); apricot_style.set_color(Color::Rgb(255, 195, 0)).set_bold(true); - let timestamp = buf.timestamp(); - let msg = match record.level(){ Level::Warn => (orange_style.value(record.level()), orange_style.value(record.args())), Level::Info => (green_style.value(record.level()), white_style.value(record.args())), Level::Debug => (apricot_style.value(record.level()), apricot_style.value(record.args())), + Level::Error => (red_style.value(record.level()), red_style.value(record.args())), _ => (white_style.value(record.level()), white_style.value(record.args())) }; @@ -70,6 +70,7 @@ fn main() -> Result<()> { output, workdir, extract, + keep, kraken_db, kraken_threads, kraken_taxa, @@ -84,13 +85,18 @@ fn main() -> Result<()> { let mut read_files = input; for (db_index, db_path) in kraken_db.into_iter().enumerate() { - + // Extract the database name from path and run Kraken2 let db_name = get_db_name(&db_path)?; let kraken_files = scrubber.run_kraken(&read_files, &db_path, &db_name, &db_index, &kraken_threads)?; // These are either depleted or extracted reads - for depleted reads, we use the depleted reads as input for the next iteration // but for extracted reads, we do not want to re-extract reads with another database [https://github.com/esteinig/scrubby/issues/2] read_files = scrubber.deplete_kraken(&read_files, &db_name, &db_index, &false, &kraken_files, &kraken_taxa, &kraken_taxa_direct)?; } + // Iterating again over the depleted record files to produce the user-specified outputs + // because we want to ensure they are properly compressed (intermediate files are not) + scrubber.write_outputs(read_files, output)?; + // If we do not want to keep the intermediary files in `workdir` delete the directory + scrubber.clean_up(keep)?; diff --git a/src/scrub.rs b/src/scrub.rs index 713c2bd..295bdda 100644 --- a/src/scrub.rs +++ b/src/scrub.rs @@ -1,4 +1,5 @@ +use std::collections::HashMap; use std::fmt; use chrono::Local; use std::fs::File; @@ -9,9 +10,9 @@ use std::path::PathBuf; use std::str::from_utf8; use std::process::Output; use std::process::Command; -use std::fs::create_dir_all; +use std::fs::{create_dir_all, remove_dir_all}; use std::collections::HashSet; -use needletail::parse_fastx_file; +use needletail::{parse_fastx_file, FastxReader}; use crate::utils::CompressionExt; use std::io::{BufRead, BufReader, BufWriter}; @@ -24,6 +25,9 @@ pub enum ScrubberError { /// Represents a failure to run Kraken2 #[error("failed to run Kraken2")] KrakenClassificationError, + /// Represents a failure to count a taxonomic parent during report parsing + #[error("failed to provide a parent taxon while parsing report from Kraken2")] + KrakenReportTaxonParent, /// Represents a failure to convert the read field from string to numeric field in the report file #[error("failed to convert the read field in the report from Kraken2")] KrakenReportReadFieldConversion, @@ -50,7 +54,7 @@ pub enum ScrubberError { AbsolutePath(String), /// Indicates failure to parse file with Needletail #[error("failed file input/output")] - DepletionFastxParser(#[source] needletail::errors::ParseError), + FastxRecordIO(#[source] needletail::errors::ParseError), /// Indicates failure to obntain compression writer with Niffler #[error("failed to get compression writer")] DepletionCompressionWriter(#[source] niffler::Error), @@ -120,15 +124,17 @@ impl Scrubber { kraken_taxa_direct: &Vec ) -> Result, ScrubberError>{ - let msg_word = match extract { true => "Extracting", false => "Depleting" }; - log::info!("{} classified reads from Kraken2", &msg_word); + log::info!("Parsing taxonomic classification report..."); - let reads = parse_kraken_files( + let taxids = get_taxids_from_report( kraken_files[0].clone(), - kraken_files[1].clone(), kraken_taxa, kraken_taxa_direct )?; + + let reads = get_taxid_reads( + taxids, kraken_files[1].clone() + )?; // Temporary files for sequential depletion/extraction in workdir let output = match input.len() { @@ -141,10 +147,7 @@ impl Scrubber { // Enumerated loop is ok since we have checked matching input/output // vector length in command-line interface and match the file number - for (i, input_file) in input.iter().enumerate() { - - log::info!("{} reads {:#?} into {:#?}", &msg_word, &input_file, &output[i]); - + for (i, _) in input.iter().enumerate() { // Initiate the depletion operator and deplete/extract the reads identifiers parsed from Kraken let depletor = ReadDepletion::new(self.output_format, self.compression_level)?; let read_counts = depletor.deplete(&reads, &input[i], &output[i], extract)?; @@ -154,6 +157,34 @@ impl Scrubber { Ok(output) } + /// + pub fn write_outputs(&self, input_files: Vec, output_files: Vec) -> Result<(), ScrubberError> { + + + for (i, _) in input_files.iter().enumerate() { // input output are iensured to have same length through command-line interface + let (mut reader, mut writer) = get_reader_writer(&input_files[i], &output_files[i], self.compression_level, self.output_format)?; + + log::info!("Writing reads to output file: {:?}", output_files[i]); + + while let Some(record) = reader.next() { + let rec = record.map_err(|err| ScrubberError::FastxRecordIO(err))?; + rec.write(&mut writer, None).map_err(|err| ScrubberError::FastxRecordIO(err))?; + } + } + Ok(()) + } + pub fn clean_up(&self, keep: bool) -> Result<(), ScrubberError> { + match keep { + true => { + log::info!("Keeping working directory and intermediary files"); + }, + false => { + log::info!("Deleting working directory and intermediary files"); + remove_dir_all(&self.workdir)?; + } + } + Ok(()) + } } @@ -277,46 +308,72 @@ impl fmt::Display for TaxonomicLevel { } } -/// Parse the Kraken output report and read file +/// A struct to count reads for taxa provided by user +/// +/// Provides implementation for formatting the HashMaps +/// associated with the counts. +/// +/// Keys for the HashMaps are always the taxonomic names, as this +/// construct is meant solely for formatting informative outputs and +/// summaries of depletion or extraction. +/// +#[derive(Debug, Clone)] +pub struct TaxonCounts { + taxa: HashMap> +} +impl TaxonCounts { + pub fn new() -> Self { + TaxonCounts { taxa: HashMap::new() } + } + pub fn update(&mut self, tax_name: String, tax_parent: String, tax_reads: u64){ + self.taxa.entry(tax_parent).and_modify(|sub_counts|{ + sub_counts.entry(tax_name.clone()).and_modify(|tax_count| *tax_count += tax_reads).or_insert(tax_reads); + }).or_insert( + HashMap::from([(tax_name.clone(), tax_reads)]) + ); + } +} + +/// Parse the Kraken output report file /// -/// This functions implements the logic to first parse the record file, and extract any -/// directly specified taxon by name or identifier. It also allows for extraction of any +/// This functions implements the logic to parse the record file, and extract any +/// directly specified taxon by name or identifier. It allows for extraction of any /// taxon sub-leves of taxa by name or identifier, for example when specifying the /// domain `Eukaryota` it will parse all sub-levels of `Eukaryota` until the next full -/// domain level is reached (which prevents sub-specifications of domains to be excluded, -/// such as D1, D2, D3) +/// domain level (`D`) is reached (which prevents sub-specifications of domains to be +/// excluded, such as `D1` or `D2`) /// /// It should be ensured that the underlying database taxonomy does not incorrectly specify /// sub-levels as the triggering level - for example, when specifying `Eukaryota` the /// 16S rRNA SILVA database incorrectly specifies `Holozoa` at the domain level, so it -/// should be includedin the taxa to dpeletye to ensure all sequences below `Eukaryota`' +/// should be included in the taxa to deplete to ensure all sequences below `Eukaryota`' /// and `Holozoa` are excluded. /// -/// Only taxa with directly assigned reads are included in the to-deplete list. +/// Only taxa with directly assigned reads are included in the final set of taxids! /// -/// Returns a `HashSet` of read identifiers parsed from the read classification file -/// where the classification must be included in the to-deplete list derived from the report file. +/// Returns a `HashSet` of taxonomic identifiers parsed from the report file. /// /// # Errors /// A [`ScrubberError::KrakenReportReadFieldConversion`](#scrubbererror) is returned if the read field in the report file cannot be converted into `u64` /// A [`ScrubberError::KrakenReportDirectReadFieldConversion`](#scrubbererror) is returned if the direct read field in the report file cannot be converted into `u64` -pub fn parse_kraken_files( +pub fn get_taxids_from_report( // Kraken taxonomic report kraken_report: PathBuf, - // Kraken read classifications - kraken_reads: PathBuf, kraken_taxa: &Vec, kraken_taxa_direct: &Vec ) -> Result, ScrubberError>{ let report = BufReader::new(File::open(kraken_report)?); - // Make sure no trailign whitespaces are input by user - these are taxon names or taxon identifiers to deplete + // Make sure no trailign whitespaces are input by user - these are taxon names or taxids to deplete let kraken_taxa: Vec = kraken_taxa.into_iter().map(|x| x.trim().to_string()).collect(); let kraken_taxa_direct: Vec = kraken_taxa_direct.into_iter().map(|x| x.trim().to_string()).collect(); let mut taxids: HashSet = HashSet::new(); + let mut tax_counts: TaxonCounts = TaxonCounts::new(); + let mut extract_taxlevel: TaxonomicLevel = TaxonomicLevel::None; // make sure this makes sense to initialize + let mut extract_parent: String = String::from(""); 'report: for line in report.lines() { // Iterate over the lines in the report file - it is not known which domain comes @@ -329,8 +386,9 @@ pub fn parse_kraken_files( // Add the direct taxon identifier if the record matches by taxonomic name or identifer // (and has reads assigned directly) - this is always the case when the direct taxon is found if kraken_taxa_direct.contains(&record.tax_name) || kraken_taxa_direct.contains(&record.tax_id) { - log::info!("Detected direct taxon to deplete ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::info!("Detected direct taxon to deplete ({} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name); taxids.insert(record.tax_id.clone()); + tax_counts.update(record.tax_name.clone(), record.tax_name.clone(), record.reads_direct.clone()) } // If taxon level is above Domain - do not allow it to be processed with the block statement below @@ -338,20 +396,23 @@ pub fn parse_kraken_files( // taxonomic name or identifier if tax_level < TaxonomicLevel::Domain { // Unspecified, Unclassified, Root --> all should be given directly! - log::warn!("Detected taxon level below `Domain` - ignored in sublevel depletion ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::warn!("Found taxon above `Domain` - ignoring sublevels ({} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name); continue 'report; } if kraken_taxa.contains(&record.tax_name) || kraken_taxa.contains(&record.tax_id) { - log::info!("Detected taxon level to deplete ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::info!("Detected taxon level ({} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name); // If the current record is in the vector of taxa (and following sub-taxa) to deplete, switch on the extraction flag // and set the current tax level as a flag for stopping the extraction in subsequent records that are below or equal // to this tax level extract_taxlevel = tax_level; - log::debug!("Setting taxon level for extraction of sub-levels to {} ({})", extract_taxlevel.to_string(), &record.tax_name); + extract_parent = record.tax_name.clone(); + + log::debug!("Setting taxon level for parsing sub-levels to {} ({})", extract_taxlevel.to_string(), &record.tax_name); // Skip all records that do not have reads directly assigned to it! if record.reads_direct > 0 { taxids.insert(record.tax_id); + tax_counts.update(record.tax_name.clone(), record.tax_name.clone(), record.reads_direct.clone()) } } else { if extract_taxlevel == TaxonomicLevel::None { // guard against no taxa given on initial loop @@ -364,19 +425,47 @@ pub fn parse_kraken_files( if (tax_level <= extract_taxlevel) && (record.tax_level.len() == 1) { // guard against premature sub-level reset (e.g. D1, D2, D3) // Unset the extraction flag and reset the taxonomic level // to the lowest setting (None - does not ocurr in report) - log::debug!("Detected taxon level for sub-level reset ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::debug!("Detected taxon level for sub-level reset ({} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name); extract_taxlevel = TaxonomicLevel::None; } else { // Otherwise the taxonomic level is below the one set in the flag and the taxon should be depleted // Skip all records that do not have reads directly assigned to it! if record.reads_direct > 0 { - log::debug!("Detected taxon sub-level with reads to deplete ({} : {} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name, &record.reads_direct); + log::debug!("Detected taxon sub-level with reads ({} : {} : {} : {})", &tax_level.to_string(), &record.tax_level, &record.tax_id, &record.tax_name); taxids.insert(record.tax_id); + match extract_parent.as_str() { + "" => return Err(ScrubberError::KrakenReportTaxonParent), + _ => tax_counts.update(record.tax_name.clone(), extract_parent.clone(), record.reads_direct.clone()) + } + } } } } + + log::info!("========================================================="); + log::info!("Detected {} taxonomic levels with directly assigned reads", taxids.len()); + log::info!("========================================================="); + let mut reads = 0; + for (parent, subtaxa) in tax_counts.taxa.iter() { + for (child, count) in subtaxa { + log::info!("{} :: {} ({})", parent, child, count); + reads += count; + } + }; + log::info!("========================================================="); + log::info!("Detected a total of {} directly assigned reads to extract", reads); + log::info!("========================================================="); + + Ok(taxids) +} + +fn get_taxid_reads( + taxids: HashSet, + kraken_reads: PathBuf +) -> Result, ScrubberError> { + // HashSet of read identifiers for later depletion let mut reads: HashSet = HashSet::new(); @@ -389,7 +478,7 @@ pub fn parse_kraken_files( } } - log::info!("Parsed Kraken output files; a total of {} reads will be depleted", reads.len()); + log::info!("Parsed classification file; {} matching reads were found", reads.len()); Ok(reads) @@ -414,7 +503,7 @@ fn get_tax_level(record: &KrakenReportRecord) -> TaxonomicLevel { /* ============== -Record Structs +Kraken Records ============== */ @@ -561,17 +650,7 @@ impl ReadDepletion { ) -> Result { // Input output of read files includes compression detection - let mut reader = parse_fastx_file(input).map_err(|err| ScrubberError::DepletionFastxParser(err))?; - - let file = File::create(&output)?; - let file_handle = BufWriter::new(file); - let fmt = match self.output_format { - None => niffler::Format::from_path(&output), - Some(format) => format, - }; - - let mut writer_retained = niffler::get_writer(Box::new(file_handle), fmt, self.compression_level).map_err(|err| ScrubberError::DepletionCompressionWriter(err))?; - + let (mut reader, mut writer) = get_reader_writer(input, output, self.compression_level, self.output_format)?; let mut read_counts = ReadCounts { total: 0, @@ -588,7 +667,7 @@ impl ReadDepletion { // not depleted. We do not expect legacy format Illumina reads. while let Some(record) = reader.next() { - let rec = record.map_err(|err| ScrubberError::DepletionFastxParser(err))?; + let rec = record.map_err(|err| ScrubberError::FastxRecordIO(err))?; let rec_id = from_utf8(rec.id()).map_err(|err| ScrubberError::DepletionRecordIdentifier(err))?.split(' ').next().unwrap_or(""); // needletail parses the entire header as identifier (including description) let to_retain: bool = match extract { @@ -597,7 +676,7 @@ impl ReadDepletion { }; if to_retain { - rec.write(&mut writer_retained, None).map_err(|err| ScrubberError::DepletionFastxParser(err))?; + rec.write(&mut writer, None).map_err(|err| ScrubberError::FastxRecordIO(err))?; read_counts.retained += 1 } else { match extract { @@ -610,3 +689,21 @@ impl ReadDepletion { Ok(read_counts) } } + + +// Utility function to get a Needletail reader and Niffler compressed/uncompressed writer +fn get_reader_writer(input: &PathBuf, output: &PathBuf, compression_level: niffler::compression::Level, output_format: Option) -> Result<(Box, Box), ScrubberError> { + // Input output of read files includes compression detection + let reader = parse_fastx_file(input).map_err(|err| ScrubberError::FastxRecordIO(err))?; + + let file = File::create(&output)?; + let file_handle = BufWriter::new(file); + let fmt = match output_format { + None => niffler::Format::from_path(&output), + Some(format) => format, + }; + + let writer = niffler::get_writer(Box::new(file_handle), fmt, compression_level).map_err(|err| ScrubberError::DepletionCompressionWriter(err))?; + + Ok((reader, writer)) +} \ No newline at end of file From 79f9ef2808d0bf9188b05471830319396548137a Mon Sep 17 00:00:00 2001 From: esteinig Date: Sat, 18 Feb 2023 09:08:49 +1100 Subject: [PATCH 9/9] rname scrub command to scrub-reads --- src/cli.rs | 10 +++++----- src/main.rs | 12 +++++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 9bec2ec..8aa228c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::ffi::{OsString, OsStr}; use std::path::PathBuf; use structopt::StructOpt; @@ -15,10 +16,10 @@ pub enum CliError { CompressionLevel(String), /// Indicates a bad combination of input and output files was passed. #[error("Bad combination of input and output files: {0}")] - BadInputOutputCombination(String), + BadInputOutputCombination(String) } -/// MGP-DEPLETE command-line interface +/// Scrubby command-line application #[derive(Debug, StructOpt)] pub struct Cli { #[structopt(subcommand)] @@ -29,7 +30,7 @@ pub struct Cli { pub enum Commands { #[structopt(global_settings = &[AppSettings::ColoredHelp, AppSettings::ArgRequiredElseHelp])] /// Clean seqeunce data by removing background taxa (k-mer) or host reads (alignment) - Scrub { + ScrubReads { /// Input filepath(s) (fa, fq, gz, bz). /// /// For paired Illumina you may either pass this flag twice `-i r1.fq -i r2.fq` or give two @@ -149,7 +150,7 @@ impl Cli { /// - An unequal number of `--input` and `--output` are passed pub fn validate_input_output_combination(&self) -> Result<(), CliError> { match &self.commands { - Commands::Scrub { input, output, .. } => { + Commands::ScrubReads { input, output, .. } => { let out_len = output.len(); let in_len = input.len(); if in_len > 2 { @@ -211,4 +212,3 @@ fn parse_level(s: &str) -> Result { }; Ok(lvl) } - diff --git a/src/main.rs b/src/main.rs index 15522ef..600b952 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,8 +25,7 @@ pub enum ScrubbyError { fn main() -> Result<()> { let cli = cli::Cli::from_args(); - // Command specific checks - scrubbing - + // Command line application specific checks cli.validate_input_output_combination()?; Builder::new() @@ -65,7 +64,7 @@ fn main() -> Result<()> { match cli.commands { - cli::Commands::Scrub { + cli::Commands::ScrubReads { input, output, workdir, @@ -80,8 +79,10 @@ fn main() -> Result<()> { } => { let scrubber = scrub::Scrubber::new(workdir, output_format, compression_level)?; - + + log::info!("============================================="); log::info!("Welcome to Scrubby! You name it, we clean it."); + log::info!("============================================="); let mut read_files = input; for (db_index, db_path) in kraken_db.into_iter().enumerate() { @@ -102,8 +103,9 @@ fn main() -> Result<()> { } } - + log::info!("=============================================================="); log::info!("Thank you for using Scrubby! Your sequence data, only cleaner."); + log::info!("=============================================================="); Ok(()) }