Skip to content

Commit

Permalink
Better
Browse files Browse the repository at this point in the history
  • Loading branch information
kssenii committed Nov 8, 2023
1 parent dbea507 commit 93e22e8
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 41 deletions.
188 changes: 148 additions & 40 deletions src/Interpreters/Cache/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
{
auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), range.left, range.size(), FileSegment::State::DETACHED);
return { file_segment };
return {file_segment};
}

if (locked_key.empty())
Expand Down Expand Up @@ -245,11 +245,34 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
return result;
}

std::vector<FileSegment::Range> FileCache::splitRange(size_t offset, size_t size)
{
assert(size > 0);
std::vector<FileSegment::Range> ranges;

size_t current_pos = offset;
size_t end_pos_non_included = offset + size;
size_t remaining_size = size;

FileSegments file_segments;
while (current_pos < end_pos_non_included)
{
auto current_file_segment_size = std::min(remaining_size, max_file_segment_size);
ranges.emplace_back(current_pos, current_pos + current_file_segment_size - 1);

remaining_size -= current_file_segment_size;
current_pos += current_file_segment_size;
}

return ranges;
}

FileSegments FileCache::splitRangeIntoFileSegments(
LockedKey & locked_key,
size_t offset,
size_t size,
FileSegment::State state,
size_t file_segments_limit,
const CreateFileSegmentSettings & settings)
{
assert(size > 0);
Expand All @@ -261,7 +284,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
size_t remaining_size = size;

FileSegments file_segments;
while (current_pos < end_pos_non_included)
while (current_pos < end_pos_non_included && (!file_segments_limit || file_segments.size() < file_segments_limit))
{
current_file_segment_size = std::min(remaining_size, max_file_segment_size);
remaining_size -= current_file_segment_size;
Expand All @@ -273,7 +296,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
current_pos += current_file_segment_size;
}

assert(file_segments.empty() || offset + size - 1 == file_segments.back()->range().right);
assert(file_segments.empty() || file_segments_limit > 0 || offset + size - 1 == file_segments.back()->range().right);
return file_segments;
}

Expand All @@ -298,6 +321,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
assert(!file_segments.empty());

auto it = file_segments.begin();
size_t added = 0;
auto segment_range = (*it)->range();

size_t current_pos;
Expand All @@ -310,18 +334,20 @@ void FileCache::fillHolesWithEmptyFileSegments(

current_pos = segment_range.right + 1;
++it;
++added;
}
else
current_pos = range.left;

while (current_pos <= range.right && it != file_segments.end())
while (current_pos <= range.right && it != file_segments.end() && (!file_segments_limit || added < file_segments_limit))
{
segment_range = (*it)->range();

if (current_pos == segment_range.left)
{
current_pos = segment_range.right + 1;
++it;
++added;
continue;
}

Expand All @@ -338,17 +364,38 @@ void FileCache::fillHolesWithEmptyFileSegments(
}
else
{
auto split = splitRangeIntoFileSegments(
locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings);
file_segments.splice(it, std::move(split));
auto ranges = splitRange(current_pos, hole_size);
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++added;

if (file_segments_limit && added == file_segments_limit)
{
file_segments.splice(it, std::move(hole));
file_segments.erase(it, file_segments.end());
return;
}
}
file_segments.splice(it, std::move(hole));
}

current_pos = segment_range.right + 1;
++it;
++added;
}

if (file_segments_limit && file_segments.size() >= file_segments_limit)
if (file_segments_limit && added == file_segments_limit)
{
chassert(file_segments.size() >= file_segments_limit);
file_segments.erase(it, file_segments.end());
chassert(file_segments.size() == file_segments_limit);
return;
}

chassert(!file_segments_limit || file_segments.size() < file_segments_limit);

if (current_pos <= range.right)
{
Expand All @@ -368,9 +415,22 @@ void FileCache::fillHolesWithEmptyFileSegments(
}
else
{
auto split = splitRangeIntoFileSegments(
locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings);
file_segments.splice(file_segments.end(), std::move(split));
auto ranges = splitRange(current_pos, hole_size);
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++added;

if (file_segments_limit && added == file_segments_limit)
{
file_segments.splice(it, std::move(hole));
file_segments.erase(it, file_segments.end());
return;
}
}
file_segments.splice(it, std::move(hole));
}
}
}
Expand Down Expand Up @@ -400,7 +460,7 @@ FileSegmentsHolderPtr FileCache::set(
else
{
file_segments = splitRangeIntoFileSegments(
*locked_key, offset, size, FileSegment::State::EMPTY, settings);
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, settings);
}

return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
Expand All @@ -419,77 +479,125 @@ FileCache::getOrSet(

assertInitialized();

const auto end_offset = offset + size - 1;
const auto aligned_offset = roundDownToMultiple(offset, boundary_alignment);
const auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size) - 1;
chassert(aligned_offset <= offset);
FileSegment::Range range(offset, offset + size - 1);

auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY);
const auto aligned_offset = roundDownToMultiple(range.left, boundary_alignment);
auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size) - 1;

chassert(aligned_offset <= range.left);
chassert(aligned_end_offset >= range.right);

auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY);
/// Get all segments which intersect with the given range.
FileSegment::Range range(offset, end_offset);
auto file_segments = getImpl(*locked_key, range, file_segments_limit);

if (aligned_offset < offset && (file_segments.empty() || offset < file_segments.front()->range().left))
if (file_segments_limit)
{
chassert(file_segments.size() <= file_segments_limit);
if (file_segments.size() == file_segments_limit)
range.right = aligned_end_offset = file_segments.back()->range().right;
}

/// Check case if we have uncovered prefix, e.g.
///
/// [_______________]
/// ^ ^
/// range.left range.right
/// [___] [__________] <-- current cache (example)
/// [ ]
/// ^----^
/// uncovered prefix.
const bool has_uncovered_prefix = file_segments.empty() || range.left < file_segments.front()->range().left;

if (aligned_offset < range.left && has_uncovered_prefix)
{
auto prefix_range = FileSegment::Range(aligned_offset, file_segments.empty() ? offset - 1 : file_segments.front()->range().left - 1);
auto prefix_range = FileSegment::Range(aligned_offset, file_segments.empty() ? range.left - 1 : file_segments.front()->range().left - 1);
auto prefix_file_segments = getImpl(*locked_key, prefix_range, /* file_segments_limit */0);

if (prefix_file_segments.empty())
{
/// [____________________][_______________]
/// ^ ^ ^
/// aligned_offset range.left range.right
/// [___] [__________] <-- current cache (example)
range.left = aligned_offset;
}
else
{
size_t last_right_offset = prefix_file_segments.back()->range().right;
/// [____________________][_______________]
/// ^ ^ ^
/// aligned_offset range.left range.right
/// ____] [____] [___] [__________] <-- current cache (example)
/// ^
/// prefix_file_segments.back().right

while (!prefix_file_segments.empty() && prefix_file_segments.front()->range().right < offset)
prefix_file_segments.pop_front();
chassert(prefix_file_segments.back()->range().right < range.left);
chassert(prefix_file_segments.back()->range().right >= aligned_offset);

if (prefix_file_segments.empty())
{
range.left = last_right_offset + 1;
}
else
{
file_segments.splice(file_segments.begin(), prefix_file_segments);
range.left = file_segments.front()->range().left;
}
range.left = prefix_file_segments.back()->range().right + 1;
}
}

if (end_offset < aligned_end_offset && (file_segments.empty() || file_segments.back()->range().right < end_offset))
/// Check case if we have uncovered suffix.
///
/// [___________________]
/// ^ ^
/// range.left range.right
/// [___] [___] <-- current cache (example)
/// [___]
/// ^---^
/// uncovered_suffix
const bool has_uncovered_suffix = file_segments.empty() || file_segments.back()->range().right < range.right;

if (range.right < aligned_end_offset && has_uncovered_suffix)
{
auto suffix_range = FileSegment::Range(end_offset, aligned_end_offset);
/// Get only 1 file segment.
auto suffix_range = FileSegment::Range(range.right, aligned_end_offset);
/// We need to get 1 file segment, so file_segments_limit = 1 here.
auto suffix_file_segments = getImpl(*locked_key, suffix_range, /* file_segments_limit */1);

if (suffix_file_segments.empty())
{
/// [__________________][ ]
/// ^ ^ ^
/// range.left range.right aligned_end_offset
/// [___] [___] <-- current cache (example)

range.right = aligned_end_offset;
}
else
{
/// [__________________][ ]
/// ^ ^ ^
/// range.left range.right aligned_end_offset
/// [___] [___] [_________] <-- current cache (example)
/// ^
/// suffix_file_segments.front().left
range.right = suffix_file_segments.front()->range().left - 1;
}
}

if (file_segments.empty())
{
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, settings);
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, settings);
}
else
{
chassert(file_segments.front()->range().right >= offset);
chassert(file_segments.back()->range().left <= end_offset);
chassert(file_segments.front()->range().right >= range.left);
chassert(file_segments.back()->range().left <= range.right);

fillHolesWithEmptyFileSegments(
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, settings);

chassert(!file_segments_limit || file_segments.size() <= file_segments_limit);

if (!file_segments.front()->range().contains(offset))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected {} to include {} "
"(end offset: {}, aligned offset: {}, aligned end offset: {})",
file_segments.front()->range().toString(), offset, end_offset, aligned_offset, aligned_end_offset);
file_segments.front()->range().toString(), offset, range.right, aligned_offset, aligned_end_offset);
}

chassert(file_segments_limit ? file_segments.back()->range().left <= end_offset : file_segments.back()->range().contains(end_offset));
chassert(file_segments_limit ? file_segments.back()->range().left <= range.right : file_segments.back()->range().contains(range.right));
}

while (file_segments_limit && file_segments.size() > file_segments_limit)
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/Cache/FileCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,17 @@ class FileCache : private boost::noncopyable
void loadMetadataImpl();
void loadMetadataForKeys(const std::filesystem::path & keys_dir);

/// bool - if `file_segments_limit` reached or not.
FileSegments getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const;

std::vector<FileSegment::Range> splitRange(size_t offset, size_t size);

FileSegments splitRangeIntoFileSegments(
LockedKey & locked_key,
size_t offset,
size_t size,
FileSegment::State state,
size_t file_segments_limit,
const CreateFileSegmentSettings & create_settings);

void fillHolesWithEmptyFileSegments(
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Cache/FileSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ void FileSegment::use()

FileSegmentsHolder::FileSegmentsHolder(FileSegments && file_segments_)
: file_segments(std::move(file_segments_))
{
CurrentMetrics::add(CurrentMetrics::FilesystemCacheHoldFileSegments, file_segments.size());
ProfileEvents::increment(ProfileEvents::FilesystemCacheHoldFileSegments, file_segments.size());
}
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/tests/gtest_lru_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ TEST_F(FileCacheTest, get)
settings.max_elements = 5;
settings.boundary_alignment = 1;

const size_t file_size = -1; // the value doesn't really matter because boundary_alignment == 1.
const size_t file_size = INT_MAX; // the value doesn't really matter because boundary_alignment == 1.

{
std::cerr << "Step 1\n";
Expand Down

0 comments on commit 93e22e8

Please sign in to comment.