Skip to content

Commit

Permalink
db memory
Browse files Browse the repository at this point in the history
  • Loading branch information
dagou committed Jul 5, 2024
1 parent 5bf4174 commit c3d1d80
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 80 deletions.
3 changes: 3 additions & 0 deletions kr2r/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ pub struct ClassifyArgs {
#[clap(long)]
pub chunk_dir: PathBuf,

#[clap(long, value_parser = parse_size, default_value = "4G", help = "Specifies the memory size for the database at runtime.\nbest to use multiples of 4GB\n(e.g., '5.5G', '250M', '1024K').")]
pub db_memory: usize,

/// File path for outputting normal Kraken output.
#[clap(long = "output-dir", value_parser)]
pub kraken_output_dir: Option<PathBuf>,
Expand Down
57 changes: 27 additions & 30 deletions kr2r/src/bin/annotate.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use clap::Parser;
use kr2r::compact_hash::{CHTable, Compact, HashConfig, Row, Slot};
use kr2r::utils::{find_and_sort_files, open_file};
use kr2r::utils::{find_and_sort_files, find_and_sort_files_by_step, open_file, read_chunk_header};
use seqkmer::buffer_read_parallel;
use std::collections::HashMap;
use std::fs::remove_file;
use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Read, Result, Write};
use std::path::Path;
use std::path::PathBuf;
use std::time::Instant;

// 定义每批次处理的 Slot 数量
pub const BATCH_SIZE: usize = 8 * 1024 * 1024;

Expand All @@ -34,25 +36,6 @@ pub struct Args {
pub batch_size: usize,
}

fn read_chunk_header<R: Read>(reader: &mut R) -> io::Result<(usize, usize)> {
let mut buffer = [0u8; 16]; // u64 + u64 = 8 bytes + 8 bytes

reader.read_exact(&mut buffer)?;

let index = u64::from_le_bytes(
buffer[0..8]
.try_into()
.expect("Failed to convert bytes to u64 for index"),
);
let chunk_size = u64::from_le_bytes(
buffer[8..16]
.try_into()
.expect("Failed to convert bytes to u64 for chunk size"),
);

Ok((index as usize, chunk_size as usize))
}

fn write_to_file(
file_index: u64,
bytes: &[u8],
Expand Down Expand Up @@ -89,7 +72,8 @@ fn process_batch<R>(
chtm: &CHTable,
chunk_dir: PathBuf,
batch_size: usize,
page_index: usize,
chunk_index: usize,
chunk_size: usize,
) -> std::io::Result<()>
where
R: Read + Send,
Expand All @@ -100,8 +84,9 @@ where

let value_mask = hash_config.get_value_mask();
let value_bits = hash_config.get_value_bits();
let idx_mask = hash_config.get_idx_mask();
let idx_bits = hash_config.get_idx_bits();
let hash_capacity = hash_config.hash_capacity;
let idx_bits = ((chunk_size as f64).log2().ceil() as usize).max(1);
let idx_mask = (1 << idx_bits) - 1;

buffer_read_parallel(
reader,
Expand All @@ -110,7 +95,16 @@ where
|dataset: Vec<Slot<u64>>| {
let mut results: HashMap<u64, Vec<u8>> = HashMap::new();
for slot in dataset {
let indx = slot.idx & idx_mask;
let idx = slot.idx & idx_mask;
let (indx, page_index) = if chunk_size == hash_capacity {
(idx, chunk_index)
} else {
let index = chunk_index * chunk_size + idx;
let indx = index % hash_capacity;
let page_index = index / hash_capacity;
(indx, page_index)
};

let compacted = slot.value.left(value_bits) as u32;
let taxid = chtm.get_from_page(indx, compacted, page_index);

Expand Down Expand Up @@ -168,13 +162,12 @@ fn process_chunk_file<P: AsRef<Path>>(
let file = open_file(chunk_file)?;
let mut reader = BufReader::new(file);

let (page_index, _) = read_chunk_header(&mut reader)?;

let (page_index_start, page_index_end, chunk_size) = read_chunk_header(&mut reader)?;
let start = Instant::now();

println!("start load table...");
let config = HashConfig::from_hash_header(&args.database.join("hash_config.k2d"))?;
let chtm = CHTable::from_range(config, hash_files, page_index, page_index + 1)?;
let chtm = CHTable::from_range(config, hash_files, page_index_start - 1, page_index_end - 1)?;

// 计算持续时间
let duration = start.elapsed();
Expand All @@ -186,27 +179,31 @@ fn process_chunk_file<P: AsRef<Path>>(
&chtm,
args.chunk_dir.clone(),
args.batch_size,
page_index,
page_index_start / chunk_size,
chunk_size,
)?;

Ok(())
}

pub fn run(args: Args) -> Result<()> {
let chunk_files = find_and_sort_files(&args.chunk_dir, "sample", ".k2")?;
let chunk_files = find_and_sort_files_by_step(&args.chunk_dir, "sample", ".k2")?;
let hash_files = find_and_sort_files(&args.database, "hash", ".k2d")?;

// 开始计时
let start = Instant::now();
println!("annotate start...");
for chunk_file in chunk_files {
for chunk_file in &chunk_files {
process_chunk_file(&args, chunk_file, &hash_files)?;
}
// 计算持续时间
let duration = start.elapsed();
// 打印运行时间
println!("annotate took: {:?}", duration);

for chunk_file in &chunk_files {
remove_file(chunk_file)?;
}
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions kr2r/src/bin/kun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl From<ClassifyArgs> for splitr::Args {
minimum_quality_score: item.minimum_quality_score,
num_threads: item.num_threads,
chunk_dir: item.chunk_dir,
db_memory: item.db_memory,
input_files: item.input_files,
}
}
Expand Down
7 changes: 7 additions & 0 deletions kr2r/src/bin/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ pub fn run(args: Args) -> Result<()> {
// 打印运行时间
println!("resolve took: {:?}", duration);

for sample_file in &sample_files {
std::fs::remove_file(sample_file)?;
}
for sample_id_file in &sample_id_files {
std::fs::remove_file(sample_id_file)?;
}

Ok(())
}

Expand Down
83 changes: 34 additions & 49 deletions kr2r/src/bin/splitr.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use clap::Parser;
use kr2r::args::parse_size;
use kr2r::compact_hash::{HashConfig, Slot};
use kr2r::utils::{
create_partition_files, create_partition_writers, create_sample_file, get_file_limit,
get_lastest_file_index, set_fd_limit,
create_partition_writers_by_step, create_sample_file, get_file_limit, get_lastest_file_index,
set_fd_limit,
};
use kr2r::IndexOptions;
use seqkmer::{read_parallel, FastxReader, Meros, MinimizerIterator, OptionPair, Reader};
Expand All @@ -27,9 +28,13 @@ pub struct Args {
#[arg(long = "db", required = true)]
pub database: PathBuf,

// /// The file path for the Kraken 2 options.
// #[clap(short = 'o', long = "options-filename", value_parser, required = true)]
// options_filename: String,
/// chunk directory
#[clap(long)]
pub chunk_dir: PathBuf,

#[clap(long, value_parser = parse_size, default_value = "4G", help = "Specifies the memory size for the database at runtime.\nbest to use multiples of 4GB\n(e.g., '5.5G', '250M', '1024K').")]
pub db_memory: usize,

/// Enable paired-end processing.
#[clap(short = 'P', long = "paired-end-processing", action)]
pub paired_end_processing: bool,
Expand All @@ -51,50 +56,12 @@ pub struct Args {
#[clap(short = 'p', long = "num-threads", value_parser, default_value_t = num_cpus::get())]
pub num_threads: usize,

/// chunk directory
#[clap(long)]
pub chunk_dir: PathBuf,

/// A list of input file paths (FASTA/FASTQ) to be processed by the classify program.
/// Supports fasta or fastq format files (e.g., .fasta, .fastq) and gzip compressed files (e.g., .fasta.gz, .fastq.gz).
// #[clap(short = 'F', long = "files")]
pub input_files: Vec<String>,
}

fn init_chunk_writers(
args: &Args,
partition: usize,
chunk_size: usize,
) -> Vec<BufWriter<fs::File>> {
let chunk_files = create_partition_files(partition, &args.chunk_dir, "sample");

let mut writers = create_partition_writers(&chunk_files);

writers.iter_mut().enumerate().for_each(|(index, writer)| {
// 获取对应的文件大小
let file_size = writer
.get_ref()
.metadata()
.expect("Failed to get file metadata")
.len();

if file_size == 0 {
writer
.write_all(&index.to_le_bytes())
.expect("Failed to write partition");

let chunk_size_bytes = chunk_size.to_le_bytes();
writer
.write_all(&chunk_size_bytes)
.expect("Failed to write chunk size");

writer.flush().expect("Failed to flush writer");
}
});

writers
}

/// 处理record
fn process_record(
k2_slot_list: &mut Vec<(usize, Slot<u64>)>,
Expand All @@ -120,10 +87,11 @@ fn write_data_to_file(
k2_slot_list: Vec<(usize, Slot<u64>)>,
writers: &mut Vec<BufWriter<fs::File>>,
slot_size: usize,
set_size: usize,
sample_writer: &mut BufWriter<fs::File>,
) {
for slot in k2_slot_list {
let partition_index = slot.0;
let partition_index = slot.0 / set_size;
if let Some(writer) = writers.get_mut(partition_index) {
writer.write_all(slot.1.as_slice(slot_size)).unwrap();
}
Expand All @@ -137,14 +105,15 @@ fn process_fastx_file<R>(
meros: Meros,
hash_config: HashConfig,
file_index: usize,
set_size: usize,
chunk_size: usize,
reader: &mut R,
writers: &mut Vec<BufWriter<fs::File>>,
sample_writer: &mut BufWriter<fs::File>,
) -> Result<()>
where
R: Reader,
{
let chunk_size = hash_config.hash_capacity;
let idx_bits = ((chunk_size as f64).log2().ceil() as usize).max(1);
let slot_size = std::mem::size_of::<Slot<u64>>();

Expand Down Expand Up @@ -184,7 +153,14 @@ where
},
|dataset| {
while let Some(Some((buffer, k2_slot_list))) = dataset.next() {
write_data_to_file(buffer, k2_slot_list, writers, slot_size, sample_writer);
write_data_to_file(
buffer,
k2_slot_list,
writers,
slot_size,
set_size,
sample_writer,
);
}
},
)
Expand Down Expand Up @@ -259,15 +235,22 @@ pub fn run(args: Args) -> Result<()> {
if hash_config.partition >= file_num_limit {
set_fd_limit(hash_config.partition as u64 + 1)
.expect("Failed to set file descriptor limit");
// panic!("Exceeds File Number Limit");
}

let meros = idx_opts.as_meros();
let start = Instant::now();
let partition = hash_config.partition;
let mut writers: Vec<BufWriter<fs::File>> =
init_chunk_writers(&args, partition, hash_config.hash_capacity);
let set_size = args.db_memory / 4 / hash_config.hash_capacity;

let chunk_size = hash_config.hash_capacity * set_size;

let mut writers = create_partition_writers_by_step(
partition,
set_size,
&args.chunk_dir,
"sample",
chunk_size,
);
process_files(&args, hash_config, |file_index, path_pair| {
let mut sample_writer =
create_sample_file(args.chunk_dir.join(format!("sample_id_{}.map", file_index)));
Expand All @@ -279,6 +262,8 @@ pub fn run(args: Args) -> Result<()> {
meros,
hash_config,
file_index,
set_size,
chunk_size,
&mut reader,
&mut writers,
&mut sample_writer,
Expand Down
Loading

0 comments on commit c3d1d80

Please sign in to comment.