diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 88ca3697026f28..32604a65e58dae 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1011,7 +1011,7 @@ DEFINE_Bool(enable_file_cache_query_limit, "false"); DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90"); DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80"); DEFINE_mBool(enable_read_cache_file_directly, "false"); -DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "false"); +DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true"); DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not checking // If true, evict the ttl cache using LRU when full. // Otherwise, only expiration can evict ttl and new data won't add to cache when full. @@ -1292,7 +1292,7 @@ DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16"); // The max thread num for S3FileUploadThreadPool DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64"); // The max ratio for ttl cache's size -DEFINE_mInt64(max_ttl_cache_ratio, "90"); +DEFINE_mInt64(max_ttl_cache_ratio, "50"); // The maximum jvm heap usage ratio for hdfs write workload DEFINE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio, "0.5"); // The sleep milliseconds duration when hdfs write exceeds the maximum usage diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index f2f1f22365297b..4fb3f3e02cb58c 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -86,6 +86,94 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _total_evict_size_metrics = std::make_shared>( _cache_base_path.c_str(), "file_cache_total_evict_size"); + _evict_by_heat_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_disposable_to_normal"); + _evict_by_heat_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_disposable_to_index"); + _evict_by_heat_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_disposable_to_ttl"); + _evict_by_heat_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_normal_to_disposable"); + _evict_by_heat_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_normal_to_index"); + _evict_by_heat_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_normal_to_ttl"); + _evict_by_heat_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_index_to_disposable"); + _evict_by_heat_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_index_to_normal"); + _evict_by_heat_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_index_to_ttl"); + _evict_by_heat_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_ttl_to_disposable"); + _evict_by_heat_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_ttl_to_normal"); + _evict_by_heat_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_heat_ttl_to_index"); + + _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_self_lru_disposable"); + _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_self_lru_normal"); + _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared>( + _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index"); + _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared>( + _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl"); + + _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_disposable_to_normal"); + _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_disposable_to_index"); + _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_disposable_to_ttl"); + _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_normal_to_disposable"); + _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_normal_to_index"); + _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_normal_to_ttl"); + _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_index_to_disposable"); + _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_index_to_normal"); + _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_index_to_ttl"); + _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_ttl_to_disposable"); + _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_ttl_to_normal"); + _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] = + std::make_shared>(_cache_base_path.c_str(), + "file_cache_evict_by_size_ttl_to_index"); + + _evict_by_try_release = std::make_shared>( + _cache_base_path.c_str(), "file_cache_evict_by_try_release"); + _num_read_blocks = std::make_shared>(_cache_base_path.c_str(), "file_cache_num_read_blocks"); _num_hit_blocks = std::make_shared>(_cache_base_path.c_str(), @@ -109,6 +197,8 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, "file_cache_hit_ratio_5m", 0.0); _hit_ratio_1h = std::make_shared>(_cache_base_path.c_str(), "file_cache_hit_ratio_1h", 0.0); + _disk_limit_mode_metrics = + std::make_shared>(_cache_base_path.c_str(), "disk_limit_mode", 0); _disposable_queue = LRUQueue(cache_settings.disposable_queue_size, cache_settings.disposable_queue_elements, 60 * 60); @@ -116,7 +206,7 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, 7 * 24 * 60 * 60); _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements, 24 * 60 * 60); - _ttl_queue = LRUQueue(std::numeric_limits::max(), std::numeric_limits::max(), + _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements, std::numeric_limits::max()); _recycle_keys = std::make_shared>( @@ -317,14 +407,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte if (st.ok()) { auto& queue = get_queue(origin_type); queue.remove(cell.queue_iterator.value(), cache_lock); - if (config::enable_ttl_cache_evict_using_lru) { - auto& ttl_queue = get_queue(FileCacheType::TTL); - cell.queue_iterator = ttl_queue.add( - cell.file_block->get_hash_value(), cell.file_block->offset(), - cell.file_block->range().size(), cache_lock); - } else { - cell.queue_iterator.reset(); - } + auto& ttl_queue = get_queue(FileCacheType::TTL); + cell.queue_iterator = + ttl_queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(), + cell.file_block->range().size(), cache_lock); } else { LOG_WARNING("Failed to change key meta").error(st); } @@ -734,11 +820,10 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha << " cache_type=" << cache_type_to_string(context.cache_type) << " error=" << st.msg(); } - if (cell.file_block->cache_type() != FileCacheType::TTL || - config::enable_ttl_cache_evict_using_lru) { - auto& queue = get_queue(cell.file_block->cache_type()); - cell.queue_iterator = queue.add(hash, offset, size, cache_lock); - } + + auto& queue = get_queue(cell.file_block->cache_type()); + cell.queue_iterator = queue.add(hash, offset, size, cache_lock); + if (cell.file_block->cache_type() == FileCacheType::TTL) { if (_key_to_time.find(hash) == _key_to_time.end()) { _key_to_time[hash] = context.expiration_time; @@ -761,11 +846,14 @@ size_t BlockFileCache::try_release() { } } } + size_t remove_size = 0; for (auto& cell : trash) { FileBlockSPtr file_block = cell->file_block; std::lock_guard lc(cell->file_block->_mutex); + remove_size += file_block->range().size(); remove(file_block, cache_lock, lc); } + *_evict_by_try_release << remove_size; LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path; return trash.size(); } @@ -856,9 +944,10 @@ void BlockFileCache::remove_file_blocks_and_clean_time_maps( void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size, size_t& removed_size, std::vector& to_evict, - std::lock_guard& cache_lock, bool is_ttl) { + std::lock_guard& cache_lock, + size_t& cur_removed_size) { for (const auto& [entry_key, entry_offset, entry_size] : queue) { - if (!is_overflow(removed_size, size, cur_cache_size, is_ttl)) { + if (!is_overflow(removed_size, size, cur_cache_size)) { break; } auto* cell = get_cell(entry_key, entry_offset, cache_lock); @@ -876,6 +965,7 @@ void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); to_evict.push_back(cell); removed_size += cell_size; + cur_removed_size += cell_size; } } } @@ -901,8 +991,9 @@ bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size, } std::vector to_evict; auto collect_eliminate_fragments = [&](LRUQueue& queue) { + size_t cur_removed_size = 0; find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, - false); + cur_removed_size); }; if (disposable_queue_size != 0) { collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE)); @@ -929,8 +1020,9 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard to_evict; + size_t cur_removed_size = 0; find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, - true); + cur_removed_size); remove_file_blocks_and_clean_time_maps(to_evict, cache_lock); return !is_overflow(removed_size, size, cur_cache_size); @@ -963,10 +1055,6 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& size = 5 * size; } - if (context.cache_type == FileCacheType::TTL) { - return try_reserve_for_ttl(size, cache_lock); - } - auto query_context = config::enable_file_cache_query_limit && (context.query_id.hi != 0 || context.query_id.lo != 0) ? get_query_context(context.query_id, cache_lock) @@ -1144,12 +1232,33 @@ void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) { } } -std::vector BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) { +std::vector BlockFileCache::get_other_cache_type_without_ttl( + FileCacheType cur_cache_type) { switch (cur_cache_type) { + case FileCacheType::TTL: + return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX}; case FileCacheType::INDEX: return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL}; case FileCacheType::NORMAL: return {FileCacheType::DISPOSABLE, FileCacheType::INDEX}; + case FileCacheType::DISPOSABLE: + return {FileCacheType::NORMAL, FileCacheType::INDEX}; + default: + return {}; + } + return {}; +} + +std::vector BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) { + switch (cur_cache_type) { + case FileCacheType::TTL: + return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX}; + case FileCacheType::INDEX: + return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL}; + case FileCacheType::NORMAL: + return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL}; + case FileCacheType::DISPOSABLE: + return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL}; default: return {}; } @@ -1175,13 +1284,14 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size } bool BlockFileCache::try_reserve_from_other_queue_by_hot_interval( - std::vector other_cache_types, size_t size, int64_t cur_time, - std::lock_guard& cache_lock) { + FileCacheType cur_type, std::vector other_cache_types, size_t size, + int64_t cur_time, std::lock_guard& cache_lock) { size_t removed_size = 0; size_t cur_cache_size = _cur_cache_size; std::vector to_evict; for (FileCacheType cache_type : other_cache_types) { auto& queue = get_queue(cache_type); + size_t remove_size_per_type = 0; for (const auto& [entry_key, entry_offset, entry_size] : queue) { if (!is_overflow(removed_size, size, cur_cache_size)) { break; @@ -1203,39 +1313,48 @@ bool BlockFileCache::try_reserve_from_other_queue_by_hot_interval( DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); to_evict.push_back(cell); removed_size += cell_size; + remove_size_per_type += cell_size; } } + *(_evict_by_heat_metrics_matrix[cache_type][cur_type]) << remove_size_per_type; } remove_file_blocks(to_evict, cache_lock); return !is_overflow(removed_size, size, cur_cache_size); } -bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size, - bool is_ttl) const { +bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, + size_t cur_cache_size) const { bool ret = false; if (_disk_resource_limit_mode) { ret = (removed_size < need_size); } else { ret = (cur_cache_size + need_size - removed_size > _capacity); } - if (is_ttl) { - size_t ttl_threshold = config::max_ttl_cache_ratio * _capacity / 100; - return (ret || ((cur_cache_size + need_size - removed_size) > ttl_threshold)); - } return ret; } bool BlockFileCache::try_reserve_from_other_queue_by_size( - std::vector other_cache_types, size_t size, + FileCacheType cur_type, std::vector other_cache_types, size_t size, std::lock_guard& cache_lock) { size_t removed_size = 0; size_t cur_cache_size = _cur_cache_size; std::vector to_evict; + // we follow the privilege defined in get_other_cache_types to evict for (FileCacheType cache_type : other_cache_types) { auto& queue = get_queue(cache_type); + + // we will not drain each of them to the bottom -- i.e., we only + // evict what they have stolen. + size_t cur_queue_size = queue.get_capacity(cache_lock); + size_t cur_queue_max_size = queue.get_max_size(); + if (cur_queue_size <= cur_queue_max_size) { + continue; + } + size_t cur_removed_size = 0; find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, - false); + cur_removed_size); + *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size; } remove_file_blocks(to_evict, cache_lock); return !is_overflow(removed_size, size, cur_cache_size); @@ -1244,16 +1363,15 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size( bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size, int64_t cur_time, std::lock_guard& cache_lock) { - // disposable queue cannot reserve other queues - if (cur_cache_type == FileCacheType::DISPOSABLE) { - return false; - } - auto other_cache_types = get_other_cache_type(cur_cache_type); - bool reserve_success = try_reserve_from_other_queue_by_hot_interval(other_cache_types, size, - cur_time, cache_lock); + // currently, TTL cache is not considered as a candidate + auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type); + bool reserve_success = try_reserve_from_other_queue_by_hot_interval( + cur_cache_type, other_cache_types, size, cur_time, cache_lock); if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) { return reserve_success; } + + other_cache_types = get_other_cache_type(cur_cache_type); auto& cur_queue = get_queue(cur_cache_type); size_t cur_queue_size = cur_queue.get_capacity(cache_lock); size_t cur_queue_max_size = cur_queue.get_max_size(); @@ -1261,7 +1379,8 @@ bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) { return false; } - return try_reserve_from_other_queue_by_size(other_cache_types, size, cache_lock); + return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, + cache_lock); } bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, @@ -1277,9 +1396,11 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, size_t cur_cache_size = _cur_cache_size; std::vector to_evict; + size_t cur_removed_size = 0; find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, - false); + cur_removed_size); remove_file_blocks(to_evict, cache_lock); + *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size; if (is_overflow(removed_size, size, cur_cache_size)) { return false; @@ -1579,6 +1700,7 @@ std::string BlockFileCache::reset_capacity(size_t new_capacity) { ss << " ttl_queue released " << queue_released; } _disk_resource_limit_mode = true; + _disk_limit_mode_metrics->set_value(1); _async_clear_file_cache = true; ss << " total_space_released=" << space_released; } @@ -1600,6 +1722,7 @@ void BlockFileCache::check_disk_resource_limit() { } if (_capacity > _cur_cache_size) { _disk_resource_limit_mode = false; + _disk_limit_mode_metrics->set_value(0); } std::pair percent; int ret = disk_used_percentage(_cache_base_path, &percent); @@ -1625,10 +1748,12 @@ void BlockFileCache::check_disk_resource_limit() { if (capacity_percentage >= config::file_cache_enter_disk_resource_limit_mode_percent || inode_is_insufficient(inode_percentage)) { _disk_resource_limit_mode = true; + _disk_limit_mode_metrics->set_value(1); } else if (_disk_resource_limit_mode && (capacity_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) && (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) { _disk_resource_limit_mode = false; + _disk_limit_mode_metrics->set_value(0); } if (_disk_resource_limit_mode) { // log per mins @@ -1744,14 +1869,9 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, if (st.ok()) { auto& queue = get_queue(origin_type); queue.remove(cell.queue_iterator.value(), cache_lock); - if (config::enable_ttl_cache_evict_using_lru) { - auto& ttl_queue = get_queue(FileCacheType::TTL); - cell.queue_iterator = - ttl_queue.add(hash, cell.file_block->offset(), - cell.file_block->range().size(), cache_lock); - } else { - cell.queue_iterator.reset(); - } + auto& ttl_queue = get_queue(FileCacheType::TTL); + cell.queue_iterator = ttl_queue.add(hash, cell.file_block->offset(), + cell.file_block->range().size(), cache_lock); } if (!st.ok()) { LOG_WARNING("").error(st); @@ -1909,6 +2029,12 @@ std::map BlockFileCache::get_stats() { stats["index_queue_curr_elements"] = (double)_cur_index_queue_element_count_metrics->get_value(); + stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size(); + stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value(); + stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size(); + stats["ttl_queue_curr_elements"] = + (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value(); + stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size(); stats["normal_queue_curr_size"] = (double)_cur_normal_queue_element_count_metrics->get_value(); stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size(); @@ -1925,6 +2051,36 @@ std::map BlockFileCache::get_stats() { return stats; } +// for be UTs +std::map BlockFileCache::get_stats_unsafe() { + std::map stats; + stats["hits_ratio"] = (double)_hit_ratio->get_value(); + stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value(); + stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value(); + + stats["index_queue_max_size"] = (double)_index_queue.get_max_size(); + stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe(); + stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size(); + stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe(); + + stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size(); + stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe(); + stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size(); + stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe(); + + stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size(); + stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe(); + stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size(); + stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe(); + + stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size(); + stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe(); + stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size(); + stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe(); + + return stats; +} + template void BlockFileCache::remove(FileBlockSPtr file_block, std::lock_guard& cache_lock, std::lock_guard& block_lock, bool sync); diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index 4bedc725692653..0de33dadc8249d 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -183,6 +183,9 @@ class BlockFileCache { std::map get_stats(); + // for be UTs + std::map get_stats_unsafe(); + class LRUQueue { public: LRUQueue() = default; @@ -217,6 +220,10 @@ class BlockFileCache { return cache_size; } + size_t get_capacity_unsafe() const { return cache_size; } + + size_t get_elements_num_unsafe() const { return queue.size(); } + size_t get_elements_num(std::lock_guard& /* cache_lock */) const { return queue.size(); } @@ -383,6 +390,7 @@ class BlockFileCache { bool try_reserve_during_async_load(size_t size, std::lock_guard& cache_lock); std::vector get_other_cache_type(FileCacheType cur_cache_type); + std::vector get_other_cache_type_without_ttl(FileCacheType cur_cache_type); bool try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t offset, int64_t cur_time, std::lock_guard& cache_lock); @@ -428,15 +436,16 @@ class BlockFileCache { void recycle_deleted_blocks(); - bool try_reserve_from_other_queue_by_hot_interval(std::vector other_cache_types, + bool try_reserve_from_other_queue_by_hot_interval(FileCacheType cur_type, + std::vector other_cache_types, size_t size, int64_t cur_time, std::lock_guard& cache_lock); - bool try_reserve_from_other_queue_by_size(std::vector other_cache_types, + bool try_reserve_from_other_queue_by_size(FileCacheType cur_type, + std::vector other_cache_types, size_t size, std::lock_guard& cache_lock); - bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size, - bool is_ttl = false) const; + bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size) const; void remove_file_blocks(std::vector&, std::lock_guard&); @@ -447,7 +456,7 @@ class BlockFileCache { void find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size, size_t& removed_size, std::vector& to_evict, - std::lock_guard& cache_lock, bool is_ttl); + std::lock_guard& cache_lock, size_t& cur_removed_size); void recycle_stale_rowset_async_bottom_half(); @@ -506,6 +515,10 @@ class BlockFileCache { std::shared_ptr> _cur_disposable_queue_cache_size_metrics; std::array>, 4> _queue_evict_size_metrics; std::shared_ptr> _total_evict_size_metrics; + std::shared_ptr> _evict_by_heat_metrics_matrix[4][4]; + std::shared_ptr> _evict_by_size_metrics_matrix[4][4]; + std::shared_ptr> _evict_by_self_lru_metrics_matrix[4]; + std::shared_ptr> _evict_by_try_release; std::shared_ptr>> _num_hit_blocks_5m; std::shared_ptr>> _num_read_blocks_5m; @@ -519,6 +532,7 @@ class BlockFileCache { std::shared_ptr> _hit_ratio; std::shared_ptr> _hit_ratio_5m; std::shared_ptr> _hit_ratio_1h; + std::shared_ptr> _disk_limit_mode_metrics; }; } // namespace doris::io diff --git a/be/src/io/cache/file_cache_common.cpp b/be/src/io/cache/file_cache_common.cpp index c569ace0011866..674879300452df 100644 --- a/be/src/io/cache/file_cache_common.cpp +++ b/be/src/io/cache/file_cache_common.cpp @@ -34,6 +34,7 @@ std::string FileCacheSettings::to_string() const { << ", disposable_queue_elements: " << disposable_queue_elements << ", index_queue_size: " << index_queue_size << ", index_queue_elements: " << index_queue_elements + << ", ttl_queue_size: " << ttl_queue_size << ", ttl_queue_elements: " << ttl_queue_elements << ", query_queue_size: " << query_queue_size << ", query_queue_elements: " << query_queue_elements << ", storage: " << storage; return ss.str(); @@ -58,6 +59,10 @@ FileCacheSettings get_file_cache_settings(size_t capacity, size_t max_query_cach std::max(settings.index_queue_size / settings.max_file_block_size, REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS); + settings.ttl_queue_size = per_size * config::max_ttl_cache_ratio; + settings.ttl_queue_elements = std::max(settings.ttl_queue_size / settings.max_file_block_size, + REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS); + settings.query_queue_size = settings.capacity - settings.disposable_queue_size - settings.index_queue_size; settings.query_queue_elements = diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 21309831a8284c..30579ba7851b28 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -26,17 +26,17 @@ namespace doris::io { inline static constexpr size_t REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS = 100 * 1024; inline static constexpr size_t FILE_CACHE_MAX_FILE_BLOCK_SIZE = 1 * 1024 * 1024; -inline static constexpr size_t DEFAULT_NORMAL_PERCENT = 85; -inline static constexpr size_t DEFAULT_DISPOSABLE_PERCENT = 10; +inline static constexpr size_t DEFAULT_NORMAL_PERCENT = 40; +inline static constexpr size_t DEFAULT_DISPOSABLE_PERCENT = 5; inline static constexpr size_t DEFAULT_INDEX_PERCENT = 5; using uint128_t = vectorized::UInt128; -enum class FileCacheType { - INDEX, - NORMAL, - DISPOSABLE, - TTL, +enum FileCacheType { + INDEX = 2, + NORMAL = 1, + DISPOSABLE = 0, + TTL = 3, }; struct UInt128Wrapper { @@ -93,6 +93,8 @@ struct FileCacheSettings { size_t index_queue_elements {0}; size_t query_queue_size {0}; size_t query_queue_elements {0}; + size_t ttl_queue_size {0}; + size_t ttl_queue_elements {0}; size_t max_file_block_size {0}; size_t max_query_cache_size {0}; std::string storage; diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index f77dc439e95594..11e99a4805286f 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -81,7 +81,7 @@ constexpr unsigned long long operator"" _kb(unsigned long long m) { void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr file_block, const io::FileBlock::Range& expected_range, io::FileBlock::State expected_state) { auto range = file_block->range(); - + std::cout << "assert_range num: " << assert_n << std::endl; ASSERT_EQ(range.left, expected_range.left); ASSERT_EQ(range.right, expected_range.right); ASSERT_EQ(file_block->state(), expected_state); @@ -139,7 +139,6 @@ class BlockFileCacheTest : public testing::Test { public: static void SetUpTestSuite() { config::file_cache_enter_disk_resource_limit_mode_percent = 99; - config::enable_ttl_cache_evict_using_lru = false; bool exists {false}; ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, &exists).ok()); if (!exists) { @@ -1110,8 +1109,10 @@ TEST_F(BlockFileCacheTest, max_ttl_size) { query_id.hi = 1; query_id.lo = 1; io::FileCacheSettings settings; - settings.query_queue_size = 100000000; - settings.query_queue_elements = 100000; + settings.query_queue_size = 50000000; + settings.query_queue_elements = 50000; + settings.ttl_queue_size = 50000000; + settings.ttl_queue_elements = 50000; settings.capacity = 100000000; settings.max_file_block_size = 100000; settings.max_query_cache_size = 30; @@ -1136,7 +1137,7 @@ TEST_F(BlockFileCacheTest, max_ttl_size) { auto holder = cache.get_or_set(key1, offset, 100000, context); auto blocks = fromHolder(holder); ASSERT_EQ(blocks.size(), 1); - if (offset < 90000000) { + if (offset < 50000000) { assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), io::FileBlock::State::EMPTY); ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); @@ -1145,7 +1146,79 @@ TEST_F(BlockFileCacheTest, max_ttl_size) { io::FileBlock::State::DOWNLOADED); } else { assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), - io::FileBlock::State::SKIP_CACHE); + io::FileBlock::State::EMPTY); + } + blocks.clear(); + } + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, max_ttl_size_with_other_cache_exist) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 50000000; + settings.query_queue_elements = 50000; + settings.ttl_queue_size = 50000000; + settings.ttl_queue_elements = 50000; + settings.capacity = 100000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + auto key1 = io::BlockFileCache::hash("key5"); + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + // populate the cache with other cache type + io::CacheContext context; + context.cache_type = io::FileCacheType::NORMAL; + context.query_id = query_id; + int64_t offset = 100000000; + for (; offset < 180000000; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + blocks.clear(); + } + + // then get started with TTL + context.cache_type = io::FileCacheType::TTL; + context.query_id = query_id; + int64_t cur_time = UnixSeconds(); + context.expiration_time = cur_time + 120; + offset = 0; + for (; offset < 100000000; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + if (offset < 50000000) { + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + } else { + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); } blocks.clear(); } @@ -1195,7 +1268,7 @@ TEST_F(BlockFileCacheTest, max_ttl_size_memory_storage) { io::FileBlock::State::DOWNLOADED); } else { assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), - io::FileBlock::State::SKIP_CACHE); + io::FileBlock::State::EMPTY); } blocks.clear(); } @@ -2065,7 +2138,9 @@ TEST_F(BlockFileCacheTest, ttl_normal) { io::FileCacheSettings settings; settings.query_queue_size = 50; settings.query_queue_elements = 5; - settings.capacity = 50; + settings.ttl_queue_size = 50; + settings.ttl_queue_elements = 5; + settings.capacity = 100; settings.max_file_block_size = 30; settings.max_query_cache_size = 30; io::CacheContext context; @@ -2160,7 +2235,9 @@ TEST_F(BlockFileCacheTest, ttl_modify) { io::FileCacheSettings settings; settings.query_queue_size = 30; settings.query_queue_elements = 5; - settings.capacity = 30; + settings.ttl_queue_size = 30; + settings.ttl_queue_elements = 5; + settings.capacity = 60; settings.max_file_block_size = 30; settings.max_query_cache_size = 30; io::CacheContext context; @@ -2314,7 +2391,9 @@ TEST_F(BlockFileCacheTest, ttl_change_to_normal) { io::FileCacheSettings settings; settings.query_queue_size = 30; settings.query_queue_elements = 5; - settings.capacity = 30; + settings.ttl_queue_size = 30; + settings.ttl_queue_elements = 5; + settings.capacity = 60; settings.max_file_block_size = 30; settings.max_query_cache_size = 30; io::CacheContext context; @@ -2428,7 +2507,9 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) { io::FileCacheSettings settings; settings.query_queue_size = 30; settings.query_queue_elements = 5; - settings.capacity = 30; + settings.ttl_queue_size = 30; + settings.ttl_queue_elements = 5; + settings.capacity = 60; settings.max_file_block_size = 30; settings.max_query_cache_size = 30; io::CacheContext context; @@ -2450,6 +2531,16 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) { auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range [50, 59] auto blocks = fromHolder(holder); ASSERT_EQ(blocks.size(), 1); + // std::cout << "current cache size:" << cache.get_used_cache_size() << std::endl; + std::cout << "cache capacity:" << cache.capacity() << std::endl; + auto map = cache.get_stats_unsafe(); + for (auto& [key, value] : map) { + std::cout << key << " : " << value << std::endl; + } + auto key1 = io::BlockFileCache::hash("key1"); + std::cout << cache.dump_structure(key1) << std::endl; + std::cout << cache.dump_structure(key2) << std::endl; + assert_range(1, blocks[0], io::FileBlock::Range(50, 59), io::FileBlock::State::EMPTY); ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); download(blocks[0]); @@ -2532,105 +2623,6 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time_memory_storage) { } } -TEST_F(BlockFileCacheTest, ttl_reverse) { - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } - fs::create_directories(cache_base_path); - test_file_cache(io::FileCacheType::NORMAL); - TUniqueId query_id; - query_id.hi = 1; - query_id.lo = 1; - io::FileCacheSettings settings; - settings.query_queue_size = 36; - settings.query_queue_elements = 5; - settings.capacity = 36; - settings.max_file_block_size = 7; - settings.max_query_cache_size = 30; - io::CacheContext context; - context.cache_type = io::FileCacheType::TTL; - context.query_id = query_id; - int64_t cur_time = UnixSeconds(); - context.expiration_time = cur_time + 180; - auto key2 = io::BlockFileCache::hash("key2"); - io::BlockFileCache cache(cache_base_path, settings); - ASSERT_TRUE(cache.initialize()); - for (int i = 0; i < 100; i++) { - if (cache.get_async_open_success()) { - break; - }; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - ASSERT_TRUE(cache.get_async_open_success()); - for (size_t offset = 0; offset < 30; offset += 6) { - auto holder = cache.get_or_set(key2, offset, 6, context); - auto blocks = fromHolder(holder); - ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); - download(blocks[0]); - } - { - auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range [50, 57] - auto blocks = fromHolder(holder); - assert_range(1, blocks[0], io::FileBlock::Range(50, 56), io::FileBlock::State::SKIP_CACHE); - } - { - context.cache_type = io::FileCacheType::NORMAL; - auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range [50, 57] - auto blocks = fromHolder(holder); - assert_range(1, blocks[0], io::FileBlock::Range(50, 56), io::FileBlock::State::SKIP_CACHE); - } - - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } -} - -TEST_F(BlockFileCacheTest, ttl_reverse_memory_storage) { - test_file_cache_memory_storage(io::FileCacheType::NORMAL); - TUniqueId query_id; - query_id.hi = 1; - query_id.lo = 1; - io::FileCacheSettings settings; - settings.query_queue_size = 36; - settings.query_queue_elements = 5; - settings.capacity = 36; - settings.max_file_block_size = 7; - settings.max_query_cache_size = 30; - settings.storage = "memory"; - io::CacheContext context; - context.cache_type = io::FileCacheType::TTL; - context.query_id = query_id; - int64_t cur_time = UnixSeconds(); - context.expiration_time = cur_time + 180; - auto key2 = io::BlockFileCache::hash("key2"); - io::BlockFileCache cache(cache_base_path, settings); - ASSERT_TRUE(cache.initialize()); - for (int i = 0; i < 100; i++) { - if (cache.get_async_open_success()) { - break; - }; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - ASSERT_TRUE(cache.get_async_open_success()); - for (size_t offset = 0; offset < 30; offset += 6) { - auto holder = cache.get_or_set(key2, offset, 6, context); - auto blocks = fromHolder(holder); - ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); - download_into_memory(blocks[0]); - } - { - auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range [50, 57] - auto blocks = fromHolder(holder); - assert_range(1, blocks[0], io::FileBlock::Range(50, 56), io::FileBlock::State::SKIP_CACHE); - } - { - context.cache_type = io::FileCacheType::NORMAL; - auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range [50, 57] - auto blocks = fromHolder(holder); - assert_range(1, blocks[0], io::FileBlock::Range(50, 56), io::FileBlock::State::SKIP_CACHE); - } -} - TEST_F(BlockFileCacheTest, io_error) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); @@ -2906,7 +2898,8 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) { cache.clear_file_cache_async(); while (cache._async_clear_file_cache) ; - EXPECT_EQ(cache._cur_cache_size, 5); + EXPECT_EQ(cache._cur_cache_size, 20); // 0-4 is used again, so all the cache data in DISPOSABLE + // remain unremoved if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -4838,10 +4831,7 @@ TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) { } } -TEST_F(BlockFileCacheTest, ttl_reserve_wo_evict_using_lru) { - config::file_cache_ttl_valid_check_interval_second = 4; - config::enable_ttl_cache_evict_using_lru = false; - +TEST_F(BlockFileCacheTest, reset_capacity) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -4854,18 +4844,26 @@ TEST_F(BlockFileCacheTest, ttl_reserve_wo_evict_using_lru) { settings.query_queue_elements = 5; settings.index_queue_size = 30; settings.index_queue_elements = 5; - settings.disposable_queue_size = 0; - settings.disposable_queue_elements = 0; - settings.capacity = 60; + settings.disposable_queue_size = 30; + settings.disposable_queue_elements = 5; + settings.capacity = 90; settings.max_file_block_size = 30; settings.max_query_cache_size = 30; io::CacheContext context; context.query_id = query_id; auto key = io::BlockFileCache::hash("key1"); + auto key2 = io::BlockFileCache::hash("key2"); io::BlockFileCache cache(cache_base_path, settings); - context.cache_type = io::FileCacheType::TTL; - context.expiration_time = UnixSeconds() + 3600; - + auto sp = SyncPoint::get_instance(); + Defer defer {[sp] { + sp->clear_call_back("BlockFileCache::set_remove_batch"); + sp->clear_call_back("BlockFileCache::set_sleep_time"); + }}; + sp->set_call_back("BlockFileCache::set_sleep_time", + [](auto&& args) { *try_any_cast(args[0]) = 1; }); + sp->set_call_back("BlockFileCache::set_remove_batch", + [](auto&& args) { *try_any_cast(args[0]) = 2; }); + sp->enable_processing(); ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { if (cache.get_async_open_success()) { @@ -4873,7 +4871,8 @@ TEST_F(BlockFileCacheTest, ttl_reserve_wo_evict_using_lru) { }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 100 - 5); offset += 5) { + for (int64_t offset = 0; offset < 45; offset += 5) { + context.cache_type = static_cast((offset / 5) % 3); auto holder = cache.get_or_set(key, offset, 5, context); auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 1); @@ -4885,50 +4884,55 @@ TEST_F(BlockFileCacheTest, ttl_reserve_wo_evict_using_lru) { io::FileBlock::State::DOWNLOADED); } context.cache_type = io::FileCacheType::TTL; - context.expiration_time = UnixSeconds() + 3600; - for (int64_t offset = 60; offset < 70; offset += 5) { - auto holder = cache.get_or_set(key, offset, 5, context); + int64_t cur_time = UnixSeconds(); + context.expiration_time = cur_time + 120; + for (int64_t offset = 45; offset < 90; offset += 5) { + auto holder = cache.get_or_set(key2, offset, 5, context); auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 1); assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::SKIP_CACHE); + io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(segments[0]); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::DOWNLOADED); } - - EXPECT_EQ(cache._cur_cache_size, 50); - EXPECT_EQ(cache._ttl_queue.cache_size, 0); + std::cout << cache.reset_capacity(30) << std::endl; + while (cache._async_clear_file_cache) + ; + EXPECT_EQ(cache._cur_cache_size, 30); if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } } -TEST_F(BlockFileCacheTest, ttl_reserve_with_evict_using_lru) { - config::file_cache_ttl_valid_check_interval_second = 4; - config::enable_ttl_cache_evict_using_lru = true; - +TEST_F(BlockFileCacheTest, change_cache_type1) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } fs::create_directories(cache_base_path); + auto sp = SyncPoint::get_instance(); + sp->set_call_back("FileBlock::change_cache_type", [](auto&& args) { + *try_any_cast(args[0]) = Status::IOError("inject io error"); + }); + sp->enable_processing(); TUniqueId query_id; query_id.hi = 1; query_id.lo = 1; io::FileCacheSettings settings; settings.query_queue_size = 30; settings.query_queue_elements = 5; - settings.index_queue_size = 30; - settings.index_queue_elements = 5; - settings.disposable_queue_size = 0; - settings.disposable_queue_elements = 0; - settings.capacity = 60; + settings.capacity = 30; settings.max_file_block_size = 30; settings.max_query_cache_size = 30; io::CacheContext context; + context.cache_type = io::FileCacheType::TTL; context.query_id = query_id; - auto key = io::BlockFileCache::hash("key1"); + int64_t cur_time = UnixSeconds(); + context.expiration_time = cur_time + 120; + int64_t modify_time = cur_time + 5; + auto key1 = io::BlockFileCache::hash("key1"); io::BlockFileCache cache(cache_base_path, settings); - context.cache_type = io::FileCacheType::TTL; - context.expiration_time = UnixSeconds() + 3600; - ASSERT_TRUE(cache.initialize()); for (int i = 0; i < 100; i++) { if (cache.get_async_open_success()) { @@ -4936,241 +4940,28 @@ TEST_F(BlockFileCacheTest, ttl_reserve_with_evict_using_lru) { }; std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 100); offset += 5) { - auto holder = cache.get_or_set(key, offset, 5, context); + { + auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 1); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::EMPTY); + assert_range(1, segments[0], io::FileBlock::Range(50, 59), io::FileBlock::State::EMPTY); ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); download(segments[0]); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + assert_range(1, segments[0], io::FileBlock::Range(50, 59), io::FileBlock::State::DOWNLOADED); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL); + EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time); } - context.cache_type = io::FileCacheType::TTL; - context.expiration_time = UnixSeconds() + 3600; - for (int64_t offset = 60; offset < 70; offset += 5) { - auto holder = cache.get_or_set(key, offset, 5, context); + context.cache_type = io::FileCacheType::NORMAL; + context.expiration_time = 0; + { + auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 1); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::EMPTY); - ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); - download(segments[0]); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + assert_range(1, segments[0], io::FileBlock::Range(50, 59), io::FileBlock::State::DOWNLOADED); - } - - EXPECT_EQ(cache._cur_cache_size, 50); - EXPECT_EQ(cache._ttl_queue.cache_size, 50); - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } -} - -TEST_F(BlockFileCacheTest, ttl_reserve_with_evict_using_lru_meet_max_ttl_cache_ratio_limit) { - config::file_cache_ttl_valid_check_interval_second = 4; - config::enable_ttl_cache_evict_using_lru = true; - int old = config::max_ttl_cache_ratio; - config::max_ttl_cache_ratio = 50; - - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } - fs::create_directories(cache_base_path); - TUniqueId query_id; - query_id.hi = 1; - query_id.lo = 1; - io::FileCacheSettings settings; - settings.query_queue_size = 30; - settings.query_queue_elements = 5; - settings.index_queue_size = 30; - settings.index_queue_elements = 5; - settings.disposable_queue_size = 0; - settings.disposable_queue_elements = 0; - settings.capacity = 60; - settings.max_file_block_size = 30; - settings.max_query_cache_size = 30; - io::CacheContext context; - context.query_id = query_id; - auto key = io::BlockFileCache::hash("key1"); - io::BlockFileCache cache(cache_base_path, settings); - context.cache_type = io::FileCacheType::TTL; - context.expiration_time = UnixSeconds() + 3600; - - ASSERT_TRUE(cache.initialize()); - for (int i = 0; i < 100; i++) { - if (cache.get_async_open_success()) { - break; - }; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 100); offset += 5) { - auto holder = cache.get_or_set(key, offset, 5, context); - auto segments = fromHolder(holder); - ASSERT_EQ(segments.size(), 1); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::EMPTY); - ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); - download(segments[0]); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::DOWNLOADED); - } - EXPECT_EQ(cache._cur_cache_size, 30); - EXPECT_EQ(cache._ttl_queue.cache_size, 30); - context.cache_type = io::FileCacheType::TTL; - context.expiration_time = UnixSeconds() + 3600; - for (int64_t offset = 60; offset < 70; offset += 5) { - auto holder = cache.get_or_set(key, offset, 5, context); - auto segments = fromHolder(holder); - ASSERT_EQ(segments.size(), 1); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::EMPTY); - ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); - download(segments[0]); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::DOWNLOADED); - } - - EXPECT_EQ(cache._cur_cache_size, 30); - EXPECT_EQ(cache._ttl_queue.cache_size, 30); - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } - config::max_ttl_cache_ratio = old; -} - -TEST_F(BlockFileCacheTest, reset_capacity) { - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } - fs::create_directories(cache_base_path); - TUniqueId query_id; - query_id.hi = 1; - query_id.lo = 1; - io::FileCacheSettings settings; - settings.query_queue_size = 30; - settings.query_queue_elements = 5; - settings.index_queue_size = 30; - settings.index_queue_elements = 5; - settings.disposable_queue_size = 30; - settings.disposable_queue_elements = 5; - settings.capacity = 90; - settings.max_file_block_size = 30; - settings.max_query_cache_size = 30; - io::CacheContext context; - context.query_id = query_id; - auto key = io::BlockFileCache::hash("key1"); - auto key2 = io::BlockFileCache::hash("key2"); - io::BlockFileCache cache(cache_base_path, settings); - auto sp = SyncPoint::get_instance(); - Defer defer {[sp] { - sp->clear_call_back("BlockFileCache::set_remove_batch"); - sp->clear_call_back("BlockFileCache::set_sleep_time"); - }}; - sp->set_call_back("BlockFileCache::set_sleep_time", - [](auto&& args) { *try_any_cast(args[0]) = 1; }); - sp->set_call_back("BlockFileCache::set_remove_batch", - [](auto&& args) { *try_any_cast(args[0]) = 2; }); - sp->enable_processing(); - ASSERT_TRUE(cache.initialize()); - for (int i = 0; i < 100; i++) { - if (cache.get_async_open_success()) { - break; - }; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - for (int64_t offset = 0; offset < 45; offset += 5) { - context.cache_type = static_cast((offset / 5) % 3); - auto holder = cache.get_or_set(key, offset, 5, context); - auto segments = fromHolder(holder); - ASSERT_EQ(segments.size(), 1); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::EMPTY); - ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); - download(segments[0]); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::DOWNLOADED); - } - context.cache_type = io::FileCacheType::TTL; - int64_t cur_time = UnixSeconds(); - context.expiration_time = cur_time + 120; - for (int64_t offset = 45; offset < 90; offset += 5) { - auto holder = cache.get_or_set(key2, offset, 5, context); - auto segments = fromHolder(holder); - ASSERT_EQ(segments.size(), 1); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::EMPTY); - ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); - download(segments[0]); - assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), - io::FileBlock::State::DOWNLOADED); - } - std::cout << cache.reset_capacity(30) << std::endl; - while (cache._async_clear_file_cache) - ; - EXPECT_EQ(cache._cur_cache_size, 30); - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } -} - -TEST_F(BlockFileCacheTest, change_cache_type1) { - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } - fs::create_directories(cache_base_path); - auto sp = SyncPoint::get_instance(); - sp->set_call_back("FileBlock::change_cache_type", [](auto&& args) { - *try_any_cast(args[0]) = Status::IOError("inject io error"); - }); - sp->enable_processing(); - TUniqueId query_id; - query_id.hi = 1; - query_id.lo = 1; - io::FileCacheSettings settings; - settings.query_queue_size = 30; - settings.query_queue_elements = 5; - settings.capacity = 30; - settings.max_file_block_size = 30; - settings.max_query_cache_size = 30; - io::CacheContext context; - context.cache_type = io::FileCacheType::TTL; - context.query_id = query_id; - int64_t cur_time = UnixSeconds(); - context.expiration_time = cur_time + 120; - int64_t modify_time = cur_time + 5; - auto key1 = io::BlockFileCache::hash("key1"); - io::BlockFileCache cache(cache_base_path, settings); - ASSERT_TRUE(cache.initialize()); - for (int i = 0; i < 100; i++) { - if (cache.get_async_open_success()) { - break; - }; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - { - auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] - auto segments = fromHolder(holder); - ASSERT_EQ(segments.size(), 1); - assert_range(1, segments[0], io::FileBlock::Range(50, 59), io::FileBlock::State::EMPTY); - ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); - download(segments[0]); - assert_range(1, segments[0], io::FileBlock::Range(50, 59), - io::FileBlock::State::DOWNLOADED); - EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL); - EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time); - } - context.cache_type = io::FileCacheType::NORMAL; - context.expiration_time = 0; - { - auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range [50, 59] - auto segments = fromHolder(holder); - ASSERT_EQ(segments.size(), 1); - assert_range(1, segments[0], io::FileBlock::Range(50, 59), - io::FileBlock::State::DOWNLOADED); - EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL); - EXPECT_EQ(segments[0]->expiration_time(), 0); + EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL); + EXPECT_EQ(segments[0]->expiration_time(), 0); } sp->clear_call_back("FileBlock::change_cache_type"); context.cache_type = io::FileCacheType::TTL; @@ -5493,4 +5284,1388 @@ TEST_F(BlockFileCacheTest, file_cache_path_storage_parse) { } } +TEST_F(BlockFileCacheTest, populate_empty_cache_with_disposable) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + size_t limit = 1000000; + size_t cache_max = 10000000; + io::CacheContext context; + context.cache_type = io::FileCacheType::DISPOSABLE; + context.query_id = query_id; + // int64_t cur_time = UnixSeconds(); + // context.expiration_time = cur_time + 120; + auto key1 = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + int64_t offset = 0; + // fill the cache to its limit + for (; offset < limit; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + // grab more exceed the limit to max cache capacity + for (; offset < cache_max; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], cache_max); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(), 0); + + // grab more exceed the cache capacity + size_t exceed = 2000000; + for (; offset < (cache_max + exceed); offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], cache_max); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(), + exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, populate_empty_cache_with_normal) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + size_t limit = 3000000; + size_t cache_max = 10000000; + io::CacheContext context; + context.cache_type = io::FileCacheType::NORMAL; + context.query_id = query_id; + // int64_t cur_time = UnixSeconds(); + // context.expiration_time = cur_time + 120; + auto key1 = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + int64_t offset = 0; + // fill the cache to its limit + for (; offset < limit; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + // grab more exceed the limit to max cache capacity + for (; offset < cache_max; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL]->get_value(), 0); + + // grab more exceed the cache capacity + size_t exceed = 2000000; + for (; offset < (cache_max + exceed); offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL]->get_value(), exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, populate_empty_cache_with_index) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + size_t limit = 1000000; + size_t cache_max = 10000000; + io::CacheContext context; + context.cache_type = io::FileCacheType::INDEX; + context.query_id = query_id; + // int64_t cur_time = UnixSeconds(); + // context.expiration_time = cur_time + 120; + auto key1 = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + int64_t offset = 0; + // fill the cache to its limit + for (; offset < limit; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + // grab more exceed the limit to max cache capacity + for (; offset < cache_max; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], cache_max); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(), 0); + + // grab more exceed the cache capacity + size_t exceed = 2000000; + for (; offset < (cache_max + exceed); offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], cache_max); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(), exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, populate_empty_cache_with_ttl) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + size_t limit = 5000000; + size_t cache_max = 10000000; + io::CacheContext context; + context.cache_type = io::FileCacheType::TTL; + context.query_id = query_id; + int64_t cur_time = UnixSeconds(); + context.expiration_time = cur_time + 120; + auto key1 = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + int64_t offset = 0; + // fill the cache to its limit + for (; offset < limit; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + // grab more exceed the limit to max cache capacity + for (; offset < cache_max; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], cache_max); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::TTL]->get_value(), 0); + + // grab more exceed the cache capacity + size_t exceed = 2000000; + for (; offset < (cache_max + exceed); offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], cache_max); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::TTL]->get_value(), exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, disposable_seize_after_normal) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + size_t limit = 1000000; + size_t cache_max = 10000000; + + io::CacheContext context1; + context1.cache_type = io::FileCacheType::NORMAL; + context1.query_id = query_id; + auto key1 = io::BlockFileCache::hash("key1"); + + int64_t offset = 0; + // fill the cache + for (; offset < cache_max; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context1); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max); + // our hero comes to the stage + io::CacheContext context2; + context2.cache_type = io::FileCacheType::DISPOSABLE; + context2.query_id = query_id; + auto key2 = io::BlockFileCache::hash("key2"); + offset = 0; + for (; offset < limit; offset += 100000) { + auto holder = cache.get_or_set(key2, offset, 100000, context2); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], limit); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max - limit); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] + ->get_value(), + limit); + + // grab more exceed the limit + size_t exceed = 2000000; + for (; offset < (limit + exceed); offset += 100000) { + auto holder = cache.get_or_set(key2, offset, 100000, context2); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], limit); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max - limit); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(), + exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, seize_after_full) { + struct Args { + io::FileCacheType first_type; + io::FileCacheType second_type; + size_t second_limit; + std::string first_metrics; + std::string second_metrics; + }; + + std::vector args_vec = { + {io::FileCacheType::NORMAL, io::FileCacheType::DISPOSABLE, 1000000, + "normal_queue_curr_size", "disposable_queue_curr_size"}, + {io::FileCacheType::NORMAL, io::FileCacheType::INDEX, 1000000, "normal_queue_curr_size", + "index_queue_curr_size"}, + {io::FileCacheType::NORMAL, io::FileCacheType::TTL, 5000000, "normal_queue_curr_size", + "ttl_queue_curr_size"}, + {io::FileCacheType::DISPOSABLE, io::FileCacheType::NORMAL, 3000000, + "disposable_queue_curr_size", "normal_queue_curr_size"}, + {io::FileCacheType::DISPOSABLE, io::FileCacheType::INDEX, 1000000, + "disposable_queue_curr_size", "index_queue_curr_size"}, + {io::FileCacheType::DISPOSABLE, io::FileCacheType::TTL, 5000000, + "disposable_queue_curr_size", "ttl_queue_curr_size"}, + {io::FileCacheType::INDEX, io::FileCacheType::NORMAL, 3000000, "index_queue_curr_size", + "normal_queue_curr_size"}, + {io::FileCacheType::INDEX, io::FileCacheType::DISPOSABLE, 1000000, + "index_queue_curr_size", "disposable_queue_curr_size"}, + {io::FileCacheType::INDEX, io::FileCacheType::TTL, 5000000, "index_queue_curr_size", + "ttl_queue_curr_size"}, + {io::FileCacheType::TTL, io::FileCacheType::NORMAL, 3000000, "ttl_queue_curr_size", + "normal_queue_curr_size"}, + {io::FileCacheType::TTL, io::FileCacheType::DISPOSABLE, 1000000, "ttl_queue_curr_size", + "disposable_queue_curr_size"}, + {io::FileCacheType::TTL, io::FileCacheType::INDEX, 1000000, "ttl_queue_curr_size", + "index_queue_curr_size"}, + }; + + for (auto& args : args_vec) { + std::cout << "filled with " << io::BlockFileCache::cache_type_to_string(args.first_type) + << " and seize with " + << io::BlockFileCache::cache_type_to_string(args.second_type) << std::endl; + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + size_t limit = args.second_limit; + size_t cache_max = 10000000; + + io::CacheContext context1; + context1.cache_type = args.first_type; + context1.query_id = query_id; + if (args.first_type == io::FileCacheType::TTL) { + int64_t cur_time = UnixSeconds(); + context1.expiration_time = cur_time + 120; + } + auto key1 = io::BlockFileCache::hash("key1"); + + int64_t offset = 0; + // fill the cache + for (; offset < cache_max; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context1); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()[args.first_metrics], cache_max); + // our hero comes to the stage + io::CacheContext context2; + context2.cache_type = args.second_type; + context2.query_id = query_id; + if (context2.cache_type == io::FileCacheType::TTL) { + int64_t cur_time = UnixSeconds(); + context2.expiration_time = cur_time + 120; + } + auto key2 = io::BlockFileCache::hash("key2"); + offset = 0; + for (; offset < limit; offset += 100000) { + auto holder = cache.get_or_set(key2, offset, 100000, context2); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()[args.second_metrics], limit); + ASSERT_EQ(cache.get_stats_unsafe()[args.first_metrics], cache_max - limit); + ASSERT_EQ( + cache._evict_by_size_metrics_matrix[args.first_type][args.second_type]->get_value(), + limit); + + // grab more exceed the limit + size_t exceed = 2000000; + for (; offset < (limit + exceed); offset += 100000) { + auto holder = cache.get_or_set(key2, offset, 100000, context2); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()[args.second_metrics], limit); + ASSERT_EQ(cache.get_stats_unsafe()[args.first_metrics], cache_max - limit); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[args.second_type]->get_value(), exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + } +} + +TEST_F(BlockFileCacheTest, evict_privilege_order_for_disposable) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + io::CacheContext context1; + context1.cache_type = io::FileCacheType::NORMAL; + context1.query_id = query_id; + auto key1 = io::BlockFileCache::hash("key1"); + + int64_t offset = 0; + + for (; offset < 3500000; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context1); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context2; + context2.cache_type = io::FileCacheType::INDEX; + context2.query_id = query_id; + auto key2 = io::BlockFileCache::hash("key2"); + + offset = 0; + + for (; offset < 1300000; offset += 100000) { + auto holder = cache.get_or_set(key2, offset, 100000, context2); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context3; + context3.cache_type = io::FileCacheType::TTL; + context3.query_id = query_id; + context3.expiration_time = UnixSeconds() + 120; + auto key3 = io::BlockFileCache::hash("key3"); + + offset = 0; + + for (; offset < 5200000; offset += 100000) { + auto holder = cache.get_or_set(key3, offset, 100000, context3); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5200000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1300000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3500000); + + // our hero comes to the stage + io::CacheContext context4; + context4.cache_type = io::FileCacheType::DISPOSABLE; + context4.query_id = query_id; + auto key4 = io::BlockFileCache::hash("key4"); + + offset = 0; + + for (; offset < 1000000; offset += 100000) { + auto holder = cache.get_or_set(key4, offset, 100000, context4); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] + ->get_value(), + 500000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] + ->get_value(), + 300000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] + ->get_value(), + 200000); + + size_t exceed = 200000; + for (; offset < (1000000 + exceed); offset += 100000) { + auto holder = cache.get_or_set(key4, offset, 100000, context4); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] + ->get_value(), + 500000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] + ->get_value(), + 300000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] + ->get_value(), + 200000); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE]->get_value(), + exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, evict_privilege_order_for_normal) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + io::CacheContext context1; + context1.cache_type = io::FileCacheType::DISPOSABLE; + context1.query_id = query_id; + auto key1 = io::BlockFileCache::hash("key1"); + + int64_t offset = 0; + + for (; offset < 1500000; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context1); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context2; + context2.cache_type = io::FileCacheType::INDEX; + context2.query_id = query_id; + auto key2 = io::BlockFileCache::hash("key2"); + + offset = 0; + + for (; offset < 1300000; offset += 100000) { + auto holder = cache.get_or_set(key2, offset, 100000, context2); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context3; + context3.cache_type = io::FileCacheType::TTL; + context3.query_id = query_id; + context3.expiration_time = UnixSeconds() + 120; + auto key3 = io::BlockFileCache::hash("key3"); + + offset = 0; + + for (; offset < 7200000; offset += 100000) { + auto holder = cache.get_or_set(key3, offset, 100000, context3); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1500000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 7200000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1300000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 0); + + // our hero comes to the stage + io::CacheContext context4; + context4.cache_type = io::FileCacheType::NORMAL; + context4.query_id = query_id; + auto key4 = io::BlockFileCache::hash("key4"); + + offset = 0; + + for (; offset < 3000000; offset += 100000) { + auto holder = cache.get_or_set(key4, offset, 100000, context4); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] + ->get_value(), + 500000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] + ->get_value(), + 300000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] + ->get_value(), + 2200000); + + size_t exceed = 200000; + for (; offset < (3000000 + exceed); offset += 100000) { + auto holder = cache.get_or_set(key4, offset, 100000, context4); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] + ->get_value(), + 500000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] + ->get_value(), + 300000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] + ->get_value(), + 2200000); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL]->get_value(), exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, evict_privilege_order_for_index) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + io::CacheContext context1; + context1.cache_type = io::FileCacheType::DISPOSABLE; + context1.query_id = query_id; + auto key1 = io::BlockFileCache::hash("key1"); + + int64_t offset = 0; + + for (; offset < 1500000; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context1); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context2; + context2.cache_type = io::FileCacheType::NORMAL; + context2.query_id = query_id; + auto key2 = io::BlockFileCache::hash("key2"); + + offset = 0; + + for (; offset < 3300000; offset += 100000) { + auto holder = cache.get_or_set(key2, offset, 100000, context2); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context3; + context3.cache_type = io::FileCacheType::TTL; + context3.query_id = query_id; + context3.expiration_time = UnixSeconds() + 120; + auto key3 = io::BlockFileCache::hash("key3"); + + offset = 0; + + for (; offset < 5200000; offset += 100000) { + auto holder = cache.get_or_set(key3, offset, 100000, context3); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1500000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5200000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3300000); + + // our hero comes to the stage + io::CacheContext context4; + context4.cache_type = io::FileCacheType::INDEX; + context4.query_id = query_id; + auto key4 = io::BlockFileCache::hash("key4"); + + offset = 0; + + for (; offset < 1000000; offset += 100000) { + auto holder = cache.get_or_set(key4, offset, 100000, context4); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] + ->get_value(), + 500000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] + ->get_value(), + 300000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] + ->get_value(), + 200000); + + size_t exceed = 200000; + for (; offset < (1000000 + exceed); offset += 100000) { + auto holder = cache.get_or_set(key4, offset, 100000, context4); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] + ->get_value(), + 500000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] + ->get_value(), + 300000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] + ->get_value(), + 200000); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(), exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, evict_privilege_order_for_ttl) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + io::CacheContext context1; + context1.cache_type = io::FileCacheType::DISPOSABLE; + context1.query_id = query_id; + auto key1 = io::BlockFileCache::hash("key1"); + + int64_t offset = 0; + + for (; offset < 1500000; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context1); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context2; + context2.cache_type = io::FileCacheType::INDEX; + context2.query_id = query_id; + auto key2 = io::BlockFileCache::hash("key2"); + + offset = 0; + + for (; offset < 1300000; offset += 100000) { + auto holder = cache.get_or_set(key2, offset, 100000, context2); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context3; + context3.cache_type = io::FileCacheType::NORMAL; + context3.query_id = query_id; + auto key3 = io::BlockFileCache::hash("key3"); + + offset = 0; + + for (; offset < 7200000; offset += 100000) { + auto holder = cache.get_or_set(key3, offset, 100000, context3); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1500000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1300000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 7200000); + + // our hero comes to the stage + io::CacheContext context4; + context4.cache_type = io::FileCacheType::TTL; + context4.query_id = query_id; + context4.expiration_time = UnixSeconds() + 120; + auto key4 = io::BlockFileCache::hash("key4"); + + offset = 0; + + for (; offset < 5000000; offset += 100000) { + auto holder = cache.get_or_set(key4, offset, 100000, context4); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] + ->get_value(), + 500000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] + ->get_value(), + 300000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] + ->get_value(), + 4200000); + + size_t exceed = 200000; + for (; offset < (5000000 + exceed); offset += 100000) { + auto holder = cache.get_or_set(key4, offset, 100000, context4); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 5000000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 1000000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 3000000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] + ->get_value(), + 500000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] + ->get_value(), + 300000); + ASSERT_EQ(cache._evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] + ->get_value(), + 4200000); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::TTL]->get_value(), exceed); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + } // namespace doris::io