Skip to content

Commit

Permalink
feat: split parts
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Jun 5, 2024
1 parent 7691f76 commit 3e15ab6
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
6 changes: 6 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,12 @@ impl ScanPart {
let end = current_range.1.max(part_range.1);
self.time_range = Some((start, end));
}

/// Returns true if the we can split the part into multiple parts
/// and preserving order.
pub(crate) fn can_split_preserve_order(&self) -> bool {
self.memtables.is_empty() && self.file_ranges.len() == 1 && self.file_ranges[0].len() > 1
}
}

/// A trait to collect file ranges to scan.
Expand Down
63 changes: 59 additions & 4 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,9 @@ impl SeqDistributor {

let parallelism = parallelism.max(1);
let parts = group_parts_by_range(self.parts);
let parts = maybe_merge_parts(parts, parallelism);
// TODO(yingwen): Split parts.

parts
let parts = maybe_split_parts(parts, parallelism);
// Ensures it doesn't returns parts more than `parallelism`.
maybe_merge_parts(parts, parallelism)
}
}

Expand Down Expand Up @@ -420,6 +419,62 @@ fn maybe_merge_parts(mut parts: Vec<ScanPart>, parallelism: usize) -> Vec<ScanPa
parts
}

/// Splits parts by parallelism.
/// It splits a part if it only scans one file and doesn't scan any memtable.
fn maybe_split_parts(mut parts: Vec<ScanPart>, parallelism: usize) -> Vec<ScanPart> {
assert!(parallelism > 0);
if parts.len() >= parallelism {
// No need to split parts.
return parts;
}

let has_part_to_split = parts.iter().any(|part| part.can_split_preserve_order());
if !has_part_to_split {
// No proper parts to scan.
return parts;
}

// Sorts parts by the number of ranges in the first file.
parts.sort_unstable_by(|a, b| {
let a_len = a.file_ranges.first().map(|file| file.len()).unwrap_or(0);
let b_len = b.file_ranges.first().map(|file| file.len()).unwrap_or(0);
a_len.cmp(&b_len).reverse()
});
let num_parts_to_split = parallelism - parts.len();
let mut output_parts = Vec::with_capacity(parallelism);
// Split parts up to num_parts_to_split.
for part in parts.iter_mut() {
if !part.can_split_preserve_order() {
continue;
}
// Safety: `can_split_preserve_order()` ensures file_ranges.len() == 1.
// Splits part into `num_parts_to_split + 1` new parts if possible.
let target_part_num = num_parts_to_split + 1;
let ranges_per_part = (part.file_ranges[0].len() + target_part_num - 1) / target_part_num;
// `can_split_preserve_order()` ensures part.file_ranges[0].len() > 1.
assert!(ranges_per_part > 0);
for ranges in part.file_ranges[0].chunks(ranges_per_part) {
let new_part = ScanPart {
memtables: Vec::new(),
file_ranges: smallvec![ranges.to_vec()],
time_range: part.time_range,
};
output_parts.push(new_part);
}
// Replace the current part with the last output part as we will put the current part
// into the output parts later.
*part = output_parts.pop().unwrap();
if output_parts.len() >= num_parts_to_split {
// We already split enough parts.
break;
}
}
// Put the remaining parts into the output parts.
output_parts.append(&mut parts);

output_parts
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down

0 comments on commit 3e15ab6

Please sign in to comment.