Skip to content

Commit

Permalink
🐛 Fixed incorrect segment download range.
Browse files Browse the repository at this point in the history
  • Loading branch information
langyo committed Aug 28, 2024
1 parent 051d6d2 commit 26edd1e
Showing 1 changed file with 39 additions and 16 deletions.
55 changes: 39 additions & 16 deletions packages/database_driver_native/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use std::{
collections::HashMap,
fs::File,
io::{Read, Seek, SeekFrom},
ops::RangeInclusive,
os::windows::fs::MetadataExt,
path::PathBuf,
Expand All @@ -16,7 +18,7 @@ use tairitsu_database_types::providers::bucket::*;
#[derive(Clone)]
pub struct ProxyBucket {
path: PathBuf,
cache: Cache<String, Bytes>,
cache: Cache<(String, (usize, usize)), Bytes>,
multipart_cache: Arc<Mutex<HashMap<String, Vec<Bytes>>>>,
}

Expand All @@ -34,7 +36,6 @@ impl BucketStore for ProxyBucket {
value.as_ref(),
)
.map_err(|err| anyhow!("Failed to write to file '{}': {}", key, err))?;
self.cache.insert(key, value).await;

Ok(())
}
Expand All @@ -46,26 +47,41 @@ impl BucketStore for ProxyBucket {
) -> Result<Option<Bytes>> {
check_key(&key)?;

let data = if let Some(data) = self.cache.get(&key).await {
data
if let Some(range) = range {
let (start, end) = (*range.start(), *range.end());

if let Some(data) = self.cache.get(&(key.clone(), (start, end))).await {
return Ok(Some(data));
} else {
let mut file = File::open({
let mut path = self.path.to_path_buf();
path.push(key.clone());
path
})
.map_err(|err| anyhow!("Failed to open file '{}': {}", key, err))?;
file.seek(SeekFrom::Start(start as u64))
.map_err(|err| anyhow!("Failed to seek in file '{}': {}", key, err))?;

let mut data = vec![0; (end - start) as usize];
file.read_exact(&mut data)
.map_err(|err| anyhow!("Failed to read from file '{}': {}", key, err))?;

let data = Bytes::from(data);
self.cache
.insert((key.clone(), (start, end)), data.clone())
.await;
return Ok(Some(data));
}
} else {
let data = std::fs::read({
let mut path = self.path.to_path_buf();
path.push(key.clone());
path
})
.map_err(|err| anyhow!("Failed to read from file '{}': {}", key, err))?;
let data = Bytes::from(data);
self.cache.insert(key.clone(), data.clone()).await;
.map_err(|err| anyhow!("Failed to read file '{}': {}", key, err))?;

data
};

Ok(Some(if let Some(range) = range {
data.slice(range.clone())
} else {
data.clone()
}))
Ok(Some(Bytes::from(data)))
}
}

async fn get_metadata(&self, key: String) -> Result<BucketItemMetadata> {
Expand Down Expand Up @@ -96,7 +112,14 @@ impl BucketStore for ProxyBucket {
async fn delete(&self, key: String) -> Result<()> {
check_key(&key)?;

self.cache.remove(&key).await;
for (item_key_raw, _) in self.cache.iter() {
let item_key_raw = (*item_key_raw).clone();
let item_key = item_key_raw.0.clone();

if item_key == key {
self.cache.remove(&item_key_raw).await;
}
}

if let Err(err) = std::fs::remove_file({
let mut path = self.path.to_path_buf();
Expand Down

0 comments on commit 26edd1e

Please sign in to comment.