Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dagou committed Jul 4, 2024
1 parent c181485 commit cca04e7
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
env:
CARGO_TERM_COLOR: always
BINARIES_LIST: 'ncbi kun_peng'
PROJECT_PREFIX: 'kraken2-rust-'
PROJECT_PREFIX: 'Kun-peng-'

jobs:
build-and-release:
Expand Down
4 changes: 2 additions & 2 deletions kr2r/src/bin/annotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ where
reader,
num_cpus::get(),
batch_size,
|dataset: &[Slot<u64>]| {
|dataset: Vec<Slot<u64>>| {
let mut results: HashMap<u64, Vec<u8>> = HashMap::new();
for slot in dataset {
let indx = slot.idx & idx_mask;
Expand Down Expand Up @@ -176,6 +176,7 @@ fn process_chunk_file<P: AsRef<Path>>(

let start = Instant::now();

println!("start load table...");
let config = HashConfig::from_hash_header(&args.database.join("hash_config.k2d"))?;
let chtm = CHTable::from_range(
config,
Expand Down Expand Up @@ -209,7 +210,6 @@ pub fn run(args: Args) -> Result<()> {
let start = Instant::now();
println!("annotate start...");
for chunk_file in chunk_files {
println!("chunk_file {:?}", chunk_file);
process_chunk_file(&args, chunk_file, &hash_files)?;
}
// 计算持续时间
Expand Down
20 changes: 8 additions & 12 deletions kr2r/src/compact_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ fn read_page_from_file<P: AsRef<Path>>(filename: P) -> Result<Page> {
let capacity = LittleEndian::read_u64(&buffer[8..16]) as usize;

// 读取数据部分
let mut data = vec![0u32; capacity];
let mut data = vec![0u32; capacity + 1024 * 1024];
let data_bytes = unsafe {
std::slice::from_raw_parts_mut(
data.as_mut_ptr() as *mut u8,
Expand All @@ -299,7 +299,7 @@ fn read_first_block_from_file<P: AsRef<Path>>(filename: P) -> Result<Page> {
let capacity = LittleEndian::read_u64(&buffer[8..16]) as usize;

let mut first_zero_end = capacity;
let chunk_size = 1024; // Define the chunk size for reading
let chunk_size = 1024 * 4;
let mut found_zero = false;
let mut data = vec![0u32; capacity];
let mut read_pos = 0;
Expand Down Expand Up @@ -373,17 +373,14 @@ impl Page {
value_bits: usize,
value_mask: usize,
) -> u32 {
// let compacted_key = value.left(value_bits) as u32;
let mut idx = index;
if idx > self.size {
return u32::default();
if idx >= self.size {
return 0;
}

loop {
if let Some(cell) = self.data.get(idx) {
if cell.right(value_mask) == u32::default()
|| cell.left(value_bits) == compacted_key
{
if cell.right(value_mask) == 0 || cell.left(value_bits) == compacted_key {
return cell.right(value_mask);
}

Expand All @@ -392,11 +389,10 @@ impl Page {
break;
}
} else {
// 如果get(idx)失败,返回默认值
return u32::default();
return 0;
}
}
u32::default()
0
}
}

Expand Down Expand Up @@ -428,7 +424,7 @@ impl CHTable {
for i in start..end {
let mut hash_file = &hash_sorted_files[i];
let mut page = read_page_from_file(&hash_file)?;
let next_page = if page.data.last().map_or(false, |&x| x == 0) {
let next_page = if page.data.last().map_or(false, |&x| x != 0) {
if kd_type {
hash_file = &hash_sorted_files[(i + 1) % parition]
}
Expand Down
13 changes: 7 additions & 6 deletions seqkmer/src/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use scoped_threadpool::Pool;
use std::collections::HashMap;
use std::io::Result;
use std::sync::Arc;

pub struct ParallelResult<P>
where
P: Send,
Expand Down Expand Up @@ -108,16 +107,16 @@ pub fn buffer_read_parallel<R, D, W, O, F, Out>(
func: F,
) -> Result<()>
where
D: Send + Sized + Sync,
D: Send + Sized + Sync + Clone,
R: std::io::Read + Send,
O: Send,
Out: Send + Default,
W: Send + Sync + Fn(&[D]) -> Option<O>,
W: Send + Sync + Fn(Vec<D>) -> Option<O>,
F: FnOnce(&mut ParallelResult<Option<O>>) -> Out + Send,
{
assert!(n_threads > 2);
let buffer_len = n_threads + 2;
let (sender, receiver) = bounded::<&[D]>(buffer_len);
let (sender, receiver) = bounded::<Vec<D>>(buffer_len);
let (done_send, done_recv) = bounded::<Option<O>>(buffer_len);
let receiver = Arc::new(receiver); // 使用 Arc 来共享 receiver
let done_send = Arc::new(done_send);
Expand All @@ -140,7 +139,9 @@ where
let slots = unsafe {
std::slice::from_raw_parts(batch_buffer.as_ptr() as *const D, slots_in_batch)
};
sender.send(slots).expect("Failed to send sequences");
sender
.send(slots.to_vec())
.expect("Failed to send sequences");
}
});

Expand All @@ -163,7 +164,7 @@ where
let _ = func(&mut parallel_result);
});

pool_scope.join_all();
// pool_scope.join_all();
});

Ok(())
Expand Down

0 comments on commit cca04e7

Please sign in to comment.