Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dagou committed Jul 3, 2024
1 parent 9d11486 commit c181485
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 339 deletions.
2 changes: 1 addition & 1 deletion kr2r/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kr2r"
version = "0.6.0"
version = "0.6.1"
edition = "2021"
authors = ["eric9n@gmail.com"]

Expand Down
109 changes: 41 additions & 68 deletions kr2r/src/bin/annotate.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use clap::Parser;
use kr2r::compact_hash::{CHTable, Compact, HashConfig, Row, Slot};
use kr2r::utils::{find_and_sort_files, open_file};
// use std::collections::HashMap;
use rayon::prelude::*;
use seqkmer::buffer_read_parallel;
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Read, Result, Write};
use std::path::Path;
use std::path::PathBuf;
use std::time::Instant;

// 定义每批次处理的 Slot 数量
pub const BATCH_SIZE: usize = 8 * 1024 * 1024;

Expand Down Expand Up @@ -100,9 +98,7 @@ fn process_batch<R>(
where
R: Read + Send,
{
let slot_size = std::mem::size_of::<Slot<u64>>();
let row_size = std::mem::size_of::<Row>();
let mut batch_buffer = vec![0u8; slot_size * batch_size];
let mut last_file_index: Option<u64> = None;
let mut writer: Option<BufWriter<File>> = None;

Expand All @@ -111,21 +107,13 @@ where
let idx_mask = hash_config.get_idx_mask();
let idx_bits = hash_config.get_idx_bits();

while let Ok(bytes_read) = reader.read(&mut batch_buffer) {
if bytes_read == 0 {
break;
} // 文件末尾

// 处理读取的数据批次
let slots_in_batch = bytes_read / slot_size;

let slots = unsafe {
std::slice::from_raw_parts(batch_buffer.as_ptr() as *const Slot<u64>, slots_in_batch)
};

let result: HashMap<u64, Vec<u8>> = slots
.par_iter()
.filter_map(|slot| {
buffer_read_parallel(
reader,
num_cpus::get(),
batch_size,
|dataset: &[Slot<u64>]| {
let mut results: HashMap<u64, Vec<u8>> = HashMap::new();
for slot in dataset {
let indx = slot.idx & idx_mask;
let compacted = slot.value.left(value_bits) as u32;
let taxid = chtm.get_from_page(indx, compacted, page_index);
Expand All @@ -137,48 +125,37 @@ where
let left = slot.value.left(value_bits) as u32;
let high = u32::combined(left, taxid, value_bits);
let row = Row::new(high, seq_id, kmer_id as u32);
// let value = slot.to_b(high);
// let value_bytes = value.to_le_bytes(); // 将u64转换为[u8; 8]
let value_bytes = row.as_slice(row_size);
Some((file_index, value_bytes.to_vec()))
} else {
None
}
})
.fold(
|| HashMap::new(),
|mut acc: HashMap<u64, Vec<u8>>, (file_index, value_bytes)| {
acc.entry(file_index)

results
.entry(file_index)
.or_insert_with(Vec::new)
.extend(value_bytes);
acc
},
)
.reduce(
|| HashMap::new(),
|mut acc, h| {
for (k, mut v) in h {
acc.entry(k).or_insert_with(Vec::new).append(&mut v);
}
}
Some(results)
},
|result| {
while let Some(Some(res)) = result.next() {
let mut file_indices: Vec<_> = res.keys().cloned().collect();
file_indices.sort_unstable(); // 对file_index进行排序

for file_index in file_indices {
if let Some(bytes) = res.get(&file_index) {
write_to_file(
file_index,
bytes,
&mut last_file_index,
&mut writer,
&chunk_dir,
)
.expect("write to file error");
}
acc
},
);

let mut file_indices: Vec<_> = result.keys().cloned().collect();
file_indices.sort_unstable(); // 对file_index进行排序

for file_index in file_indices {
if let Some(bytes) = result.get(&file_index) {
write_to_file(
file_index,
bytes,
&mut last_file_index,
&mut writer,
&chunk_dir,
)?;
}
}
}
}
},
)
.expect("failed");

if let Some(w) = writer.as_mut() {
w.flush()?;
Expand All @@ -200,16 +177,13 @@ fn process_chunk_file<P: AsRef<Path>>(
let start = Instant::now();

let config = HashConfig::from_hash_header(&args.database.join("hash_config.k2d"))?;
let parition = hash_files.len();
let chtm = if args.kraken_db_type {
CHTable::from_pair(
config,
&hash_files[page_index],
&hash_files[(page_index + 1) % parition],
)?
} else {
CHTable::from(config, &hash_files[page_index])?
};
let chtm = CHTable::from_range(
config,
hash_files,
page_index,
page_index + 1,
args.kraken_db_type,
)?;

// 计算持续时间
let duration = start.elapsed();
Expand All @@ -229,7 +203,6 @@ fn process_chunk_file<P: AsRef<Path>>(

pub fn run(args: Args) -> Result<()> {
let chunk_files = find_and_sort_files(&args.chunk_dir, "sample", ".k2")?;

let hash_files = find_and_sort_files(&args.database, "hash", ".k2d")?;

// 开始计时
Expand Down
14 changes: 9 additions & 5 deletions kr2r/src/bin/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub struct Args {
#[arg(long = "db", required = true)]
pub database: PathBuf,

/// File path for outputting normal Kraken output.
#[clap(long = "output-dir", value_parser)]
pub kraken_output_dir: Option<PathBuf>,

/// Enable paired-end processing.
#[clap(short = 'P', long = "paired-end-processing", action)]
pub paired_end_processing: bool,
Expand Down Expand Up @@ -73,9 +77,9 @@ pub struct Args {
#[clap(short = 'p', long = "num-threads", value_parser, default_value_t = num_cpus::get())]
pub num_threads: usize,

/// File path for outputting normal Kraken output.
#[clap(long = "output-dir", value_parser)]
pub kraken_output_dir: Option<PathBuf>,
/// Enables use of a Kraken 2 compatible shared database. Default is false.
#[clap(long, default_value_t = false)]
pub kraken_db_type: bool,

/// A list of input file paths (FASTA/FASTQ) to be processed by the classify program.
/// Supports fasta or fastq format files (e.g., .fasta, .fastq) and gzip compressed files (e.g., .fasta.gz, .fastq.gz).
Expand All @@ -98,7 +102,7 @@ fn process_seq(
let partition_index = idx / chunk_size;
let index = idx % chunk_size;

let taxid = chtable.get_from_page(index, compacted, partition_index + 1);
let taxid = chtable.get_from_page(index, compacted, partition_index);
if taxid > 0 {
let high = u32::combined(compacted, taxid, value_bits);
let row = Row::new(high, 0, sort as u32 + 1 + offset as u32);
Expand Down Expand Up @@ -348,7 +352,7 @@ pub fn run(args: Args) -> Result<()> {
let start = Instant::now();
let meros = idx_opts.as_meros();
let hash_files = find_and_sort_files(&args.database, "hash", ".k2d")?;
let chtable = CHTable::from_hash_files(hash_config, hash_files)?;
let chtable = CHTable::from_hash_files(hash_config, &hash_files, args.kraken_db_type)?;

process_files(args, meros, hash_config, &chtable, &taxo)?;
let duration = start.elapsed();
Expand Down
8 changes: 4 additions & 4 deletions kr2r/src/bin/kun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod splitr;

use kr2r::args::ClassifyArgs;
use kr2r::args::{parse_size, Build};
use kr2r::utils::find_and_sort_files;
use kr2r::utils::find_files;
// use std::io::Result;
use std::path::PathBuf;
use std::time::Instant;
Expand Down Expand Up @@ -180,9 +180,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let start = Instant::now();

let splitr_args = splitr::Args::from(cmd_args.clone());
let chunk_files = find_and_sort_files(&splitr_args.chunk_dir, "sample", ".k2")?;
let sample_files = find_and_sort_files(&splitr_args.chunk_dir, "sample", ".map")?;
let bin_files = find_and_sort_files(&splitr_args.chunk_dir, "sample", ".bin")?;
let chunk_files = find_files(&splitr_args.chunk_dir, "sample", ".k2");
let sample_files = find_files(&splitr_args.chunk_dir, "sample", ".map");
let bin_files = find_files(&splitr_args.chunk_dir, "sample", ".bin");
if !chunk_files.is_empty() || !sample_files.is_empty() || !bin_files.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
Expand Down
Loading

0 comments on commit c181485

Please sign in to comment.