Skip to content

Commit

Permalink
perf: more optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
appaquet committed Jul 10, 2024
1 parent 2b543ab commit 06619be
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
1 change: 1 addition & 0 deletions benches/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn bench_random_access(c: &mut Criterion) {

let builder = Builder::new(index_file).with_extsort_segment_size(200_000);
builder
.with_log_base(5)
.build(create_unknown_size_entries(1_000_000))
.unwrap();

Expand Down
20 changes: 10 additions & 10 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,11 @@ where
let checkpoint_interval = levels.last().unwrap().expected_items;

let mut entries_since_last_checkpoint = 0;
let mut last_entry_position: u64 = 0;
let mut last_entry_position: usize = 0;
for entry in iter {
let entry = entry?;

last_entry_position = counted_output.written_count();
last_entry_position = counted_output.written_count() as usize;
self.write_entry(&mut counted_output, &entry)?;
entries_since_last_checkpoint += 1;

Expand All @@ -170,7 +170,7 @@ where
level.current_items += entries_since_last_checkpoint;
}

let current_position = counted_output.written_count();
let current_position = counted_output.written_count() as usize;
self.write_checkpoint(
&mut counted_output,
current_position,
Expand All @@ -184,7 +184,7 @@ where

if entries_since_last_checkpoint > 0 {
// write one last checkpoint
let current_position = counted_output.written_count();
let current_position = counted_output.written_count() as usize;
self.write_checkpoint(
&mut counted_output,
current_position,
Expand Down Expand Up @@ -220,13 +220,13 @@ where
fn write_checkpoint<W: Write>(
&self,
output: &mut W,
current_position: u64,
entry_position: u64,
current_position: usize,
entry_position: usize,
levels: &mut [Level],
force_all_levels: bool,
) -> Result<(), BuilderError> {
let seri_levels = levels.iter().map(|level| data::CheckpointLevel {
next_position: level.last_item.unwrap_or(0),
next_position: level.last_item_position.unwrap_or(0),
});
data::Checkpoint::write(output, entry_position, seri_levels)?;

Expand All @@ -236,7 +236,7 @@ where
|| (level.expected_items - level.current_items)
<= CHECKPOINT_WRITE_UPCOMING_WITHIN_DISTANCE
{
level.last_item = Some(current_position);
level.last_item_position = Some(current_position);
level.current_items = 0;
}
}
Expand Down Expand Up @@ -272,7 +272,7 @@ fn levels_for_items_count(nb_items: u64, log_base: f64) -> Vec<Level> {
levels.push(Level {
expected_items: max_items,
current_items: 0,
last_item: None,
last_item_position: None,
});
max_items /= log_base_u64;
}
Expand All @@ -285,7 +285,7 @@ fn levels_for_items_count(nb_items: u64, log_base: f64) -> Vec<Level> {
struct Level {
expected_items: u64,
current_items: u64,
last_item: Option<u64>,
last_item_position: Option<usize>,
}

/// Index building related errors.
Expand Down
22 changes: 9 additions & 13 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,30 +129,30 @@ where
/// There is always at least 1 level, and the last one is the more granular. The
/// highest the level, the bigger the jumps are in the index.
pub struct Checkpoint<'d> {
entry_position: u64,
entry_position: usize,
data: &'d [u8],
}

#[derive(Clone, Copy)]
pub struct CheckpointLevel {
pub next_position: u64,
pub next_position: usize,
}

impl<'d> Checkpoint<'d> {
pub fn write<W, L>(
output: &mut W,
entry_position: u64,
entry_position: usize,
levels: L,
) -> Result<(), SerializationError>
where
W: Write,
L: IntoIterator<Item = CheckpointLevel>,
{
output.write_u8(OBJECT_ID_CHECKPOINT)?;
output.write_u64::<LittleEndian>(entry_position)?;
output.write_u64::<LittleEndian>(entry_position as u64)?;

for level in levels {
output.write_u64::<LittleEndian>(level.next_position)?;
output.write_u64::<LittleEndian>(level.next_position as u64)?;
}

Ok(())
Expand All @@ -163,13 +163,9 @@ impl<'d> Checkpoint<'d> {
nb_levels: usize,
) -> Result<(Checkpoint, usize), SerializationError> {
let mut checkpoint_cursor = Cursor::new(data);
let item_id = checkpoint_cursor.read_u8()?;
if item_id != OBJECT_ID_CHECKPOINT {
return Err(SerializationError::InvalidObjectType);
}

let entry_position = checkpoint_cursor.read_u64::<LittleEndian>()?;
checkpoint_cursor.set_position(1); // object id

let entry_position = checkpoint_cursor.read_u64::<LittleEndian>()? as usize;
let next_checkpoints_offset = checkpoint_cursor.position() as usize;

Ok((
Expand All @@ -187,7 +183,7 @@ impl<'d> Checkpoint<'d> {
(nb_levels * 8)
}

pub fn entry_position(&self) -> u64 {
pub fn entry_position(&self) -> usize {
self.entry_position
}

Expand All @@ -197,7 +193,7 @@ impl<'d> Checkpoint<'d> {
/// and we are seeking backward.
pub fn get_next_checkpoint(&self, level: usize) -> Result<CheckpointLevel, SerializationError> {
let level_offset = level * 8;
let next_position = (&self.data[level_offset..]).read_u64::<LittleEndian>()?;
let next_position = (&self.data[level_offset..]).read_u64::<LittleEndian>()? as usize;

Ok(CheckpointLevel { next_position })
}
Expand Down
19 changes: 11 additions & 8 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,12 @@ where
let (serialized_checkpoint, _read_size) =
data::Checkpoint::read_slice(&self.data[checkpoint_position..], self.nb_levels)?;

let entry_file_position = serialized_checkpoint.entry_position() as usize;
let entry_file_position = serialized_checkpoint.entry_position();
let (entry_key, _entry_size) =
data::Entry::<K, V, KS, VS>::read_key(&self.data[entry_file_position..])?;

Ok(Checkpoint {
entry_key,
entry_file_position,
serialized_checkpoint,
})
}
Expand Down Expand Up @@ -250,7 +249,7 @@ where
if needle_cmp == Ordering::Equal && !find_first_match {
// we found a match, and we don't care if it's the first one or not
return Ok(Some(
self.read_entry(current.checkpoint.entry_file_position)?,
self.read_entry(current.checkpoint.get_entry_position())?,
));
} else if needle_cmp == Ordering::Equal || needle_cmp == Ordering::Less {
// we found a match, but we want to make sure it's the first one,
Expand All @@ -277,7 +276,7 @@ where
if current.level == self.nb_levels - 1 {
// we reached last level, we need to iterate to entry
return Ok(self.sequential_find_entry(
Some(current.checkpoint.entry_file_position),
Some(current.checkpoint.get_entry_position()),
needle,
));
} else if let Some(previous_find) = stack.front_mut() {
Expand Down Expand Up @@ -453,7 +452,7 @@ where
.reader
.iterate_entries_from_position(Some(prev_checkpoint))
{
if entry.position > next_checkpoint.entry_file_position {
if entry.position > next_checkpoint.get_entry_position() {
break;
}

Expand Down Expand Up @@ -538,14 +537,18 @@ where
K: Ord + Serializable,
{
entry_key: K,
entry_file_position: usize,
serialized_checkpoint: data::Checkpoint<'d>,
}

impl<'d, K> Checkpoint<'d, K>
where
K: Ord + Serializable,
{
#[inline]
fn get_entry_position(&self) -> usize {
self.serialized_checkpoint.entry_position()
}

/// Gets the position of the next checkpoint at the given level.
///
/// The position of that checkpoint will be lower since the index is built from left to right
Expand All @@ -554,7 +557,7 @@ where
Ok(self
.serialized_checkpoint
.get_next_checkpoint(level)?
.next_position as usize)
.next_position)
}

/// Gets the position of the last checkpoint, given the number of levels.
Expand All @@ -564,7 +567,7 @@ where
Ok(self
.serialized_checkpoint
.get_next_checkpoint(nb_levels - 1)?
.next_position as usize)
.next_position)
}
}

Expand Down

0 comments on commit 06619be

Please sign in to comment.