Skip to content

Commit

Permalink
Issue #201: Improved chunk reader with hash for possible risks
Browse files Browse the repository at this point in the history
  • Loading branch information
lurenpluto committed Apr 12, 2023
1 parent a9f17d6 commit 717eabd
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 34 deletions.
60 changes: 46 additions & 14 deletions src/component/cyfs-bdt-ext/src/cache/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ impl ChunkStoreReader {
BuckyError::new(BuckyErrorCode::IoError, msg)
})?;

let actual_offset = file.seek(SeekFrom::Start(offset)).await.map_err(|e| {
// First verify the length
let chunk_len = chunk.len() as u64;

let file_meta = file.metadata().await.map_err(|e| {
let msg = format!(
"seek file to offset failed! chunk={}, offset={}, path={}, {}",
"get file metadata but failed! chunk={}, path={}, {}",
chunk,
offset,
path.display(),
e
);
Expand All @@ -76,17 +78,47 @@ impl ChunkStoreReader {
BuckyError::new(BuckyErrorCode::IoError, msg)
})?;

if actual_offset != offset {
if file_meta.len() < offset + chunk_len {
let msg = format!(
"seek file to offset but unmatch! chunk={}, path={}, except offset={}, got={}",
"read chunk from file with offset but len unmatch! chunk={}, path={}, offset={}, chunk_len={}, file_len={}",
chunk,
path.display(),
offset,
actual_offset
chunk_len,
file_meta.len(),
);
error!("{}", msg);

return Err(BuckyError::new(BuckyErrorCode::IoError, msg));
return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
}

// Try to seek to chunk pos in file
if offset > 0 {
let actual_offset = file.seek(SeekFrom::Start(offset)).await.map_err(|e| {
let msg = format!(
"seek file to offset failed! chunk={}, offset={}, path={}, {}",
chunk,
offset,
path.display(),
e
);
error!("{}", msg);

BuckyError::new(BuckyErrorCode::IoError, msg)
})?;

if actual_offset != offset {
let msg = format!(
"seek file to offset but unmatch! chunk={}, path={}, except offset={}, got={}",
chunk,
path.display(),
offset,
actual_offset
);
error!("{}", msg);

return Err(BuckyError::new(BuckyErrorCode::IoError, msg));
}
}

// async_std::Take not support seek, so use ReaderWithLimit instead
Expand Down Expand Up @@ -136,7 +168,8 @@ impl ChunkStoreReader {
chunk, fr.path, fr.range_begin, fr.range_end
);
let fixer = ChunkTrackerPosFixer::new(self.tracker.clone(), c.pos.clone());
Self::read_chunk(chunk, Path::new(fr.path.as_str()), fr.range_begin, fixer).await
Self::read_chunk(chunk, Path::new(fr.path.as_str()), fr.range_begin, fixer)
.await
}
TrackerPostion::ChunkManager => {
info!("will read chunk from chunk manager: chunk={}", chunk);
Expand Down Expand Up @@ -319,30 +352,30 @@ impl ChunkHashErrorHandler for ChunkTrackerPosFixer {
}
}


#[cfg(test)]
mod tests {
use super::*;
use async_std::io::prelude::*;
use cyfs_base::*;
use std::io::SeekFrom;
use std::str::FromStr;
use std::path::PathBuf;
use std::str::FromStr;

async fn test_file() {
// let file = "C:\\cyfs\\data\\app\\cyfs-stack-test\\root\\test-chunk-in-bundle";
// let chunk_id = ChunkId::from_str("7C8WUcPdJGHvGxWou3HoABNe41Xhm9m3aEsSHfj1zeWG").unwrap();

let file = PathBuf::from("C:\\cyfs\\data\\test\\2KGw87zzn4.txt");
let chunk_id = ChunkId::from_str("7C8WW21osqTTTMyRLhUN8jDbYiRdBDNEMHMiHPdDEdBB").unwrap();

let _reader = ChunkStoreReader::read_chunk(&chunk_id, &file, 8388608, None).await;
//let buf = std::fs::read(file).unwrap();
//let real_id = ChunkId::calculate_sync(&buf).unwrap();
//assert_eq!(real_id, chunk_id);

let reader = async_std::fs::File::open(file).await.unwrap();
let mut reader = ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None);
let mut reader =
ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None);

let mut buf2 = vec![];
reader.read_to_end(&mut buf2).await.unwrap_err();
Expand All @@ -351,8 +384,7 @@ mod tests {
#[test]
fn test() {
async_std::task::block_on(async move {
test1().await;
// test_file().await;
test_file().await;
});
}
}
60 changes: 40 additions & 20 deletions src/component/cyfs-util/src/util/read_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub struct ChunkReaderWithHash {
reader: Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>,
hash: sha2::Sha256,
error_handler: Option<Box<dyn ChunkHashErrorHandler>>,
seeked: bool,
hashed_len: usize,
}

impl ChunkReaderWithHash {
Expand All @@ -118,6 +120,8 @@ impl ChunkReaderWithHash {
reader,
hash: sha2::Sha256::new(),
error_handler,
seeked: false,
hashed_len: 0,
}
}
}
Expand All @@ -133,31 +137,45 @@ impl async_std::io::Read for ChunkReaderWithHash {
Poll::Ready(ret) => match ret {
Ok(size) => {
if size > 0 {
self.hashed_len += size;
self.hash.input(&buf[0..size]);
Poll::Ready(Ok(size))
} else {
let hash_value = self.hash.clone().result().into();
let actual_id = ChunkId::new(&hash_value, self.chunk_id.len() as u32);

if actual_id.eq(&self.chunk_id) {
debug!(
"read chunk from file complete! chunk={}, file={}",
self.chunk_id, self.path
if self.seeked {
warn!(
"read chunk with hash but seeked already! chunk={}",
self.chunk_id
);
Poll::Ready(Ok(0))
} else if self.hashed_len != self.chunk_id.len() {
error!("read chunk with hash but ended with unmatched length! chunk={}, len={}, read len={}",
self.chunk_id, self.chunk_id.len(), self.hashed_len,);
// FIXME what should we do?
Poll::Ready(Ok(0))
} else {
let msg = format!(
"content in file not match chunk id: chunk={}, file={}, expect hash={}, got={}",
self.chunk_id, self.path, self.chunk_id, actual_id
);
error!("{}", msg);

if let Some(error_handler) = self.error_handler.take() {
error_handler.on_hash_error(&self.chunk_id, &self.path);
let hash_value = self.hash.clone().result().into();
let actual_id = ChunkId::new(&hash_value, self.chunk_id.len() as u32);

if actual_id.eq(&self.chunk_id) {
debug!(
"read chunk from file complete! chunk={}, file={}",
self.chunk_id, self.path
);
Poll::Ready(Ok(0))
} else {
let msg = format!(
"content in file not match chunk id: chunk={}, file={}, expect hash={}, got={}",
self.chunk_id, self.path, self.chunk_id, actual_id
);
error!("{}", msg);

if let Some(error_handler) = self.error_handler.take() {
error_handler.on_hash_error(&self.chunk_id, &self.path);
}

let err = BuckyError::new(BuckyErrorCode::InvalidData, msg);
Poll::Ready(Err(err.into()))
}

let err = BuckyError::new(BuckyErrorCode::InvalidData, msg);
Poll::Ready(Err(err.into()))
}
}
}
Expand All @@ -174,6 +192,7 @@ impl async_std::io::Seek for ChunkReaderWithHash {
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<std::io::Result<u64>> {
self.seeked = true;
Pin::new(self.reader.as_mut()).poll_seek(cx, pos)
}
}
Expand All @@ -194,13 +213,14 @@ mod tests {

let file = "C:\\cyfs\\data\\test\\2JtHrtiW4J";
let chunk_id = ChunkId::from_str("7C8WXUGiYVyag6WXdsFz6B8JgpedMMgkng3MRM4XoPrX").unwrap();

//let buf = std::fs::read(file).unwrap();
//let real_id = ChunkId::calculate_sync(&buf).unwrap();
//assert_eq!(real_id, chunk_id);

let reader = async_std::fs::File::open(file).await.unwrap();
let mut reader = ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None);
let mut reader =
ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None);

let mut buf2 = vec![];
reader.read_to_end(&mut buf2).await.unwrap_err();
Expand Down

0 comments on commit 717eabd

Please sign in to comment.