diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 0fe050d02dbd3d..e5c31785c1eb1c 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -136,7 +136,7 @@ class CloudTabletMgr::TabletMap { CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) : _engine(engine), _tablet_map(std::make_unique()), - _cache(std::make_unique( + _cache(std::make_unique( CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity, LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {} diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index 4ea2699bdd9174..63a21bc0714beb 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -33,8 +33,8 @@ namespace doris { CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, - size_in_bytes, LRUCacheType::SIZE, 86400, 4), + : LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, size_in_bytes, + LRUCacheType::SIZE, 86400, 4), _stop_latch(1) {} CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() { diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index db5f8867263168..91a0531c60ae04 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -30,7 +30,7 @@ namespace doris { // Record transaction related delete bitmaps using a lru cache. -class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual { +class CloudTxnDeleteBitmapCache : public LRUCachePolicy { public: CloudTxnDeleteBitmapCache(size_t size_in_bytes); diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index ded72d9f28f7df..2ece1e3fdcd20a 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -142,7 +142,7 @@ void display_tablets_callback(const WebPageHandler::ArgumentMap& args, EasyJson* // Registered to handle "/mem_tracker", and prints out memory tracker information. void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstream* output) { (*output) << "

Memory usage by subsystem

\n"; - std::vector snapshots; + std::vector snapshots; auto iter = args.find("type"); if (iter != args.end()) { if (iter->second == "global") { @@ -159,7 +159,7 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr } else if (iter->second == "other") { MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER); } else if (iter->second == "reserved_memory") { - GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots); + MemTrackerLimiter::make_all_reserved_trackers_snapshots(&snapshots); } else if (iter->second == "all") { MemTrackerLimiter::make_all_memory_state_snapshots(&snapshots); } @@ -191,7 +191,6 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr (*output) << "" "Type" "Label" - "Parent Label" "Limit" "Current Consumption(Bytes)" @@ -207,8 +206,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr string peak_consumption_normalize = AccurateItoaKMGT(item.peak_consumption); (*output) << strings::Substitute( "$0$1$2$3$4$5$6$7\n", - item.type, item.label, item.parent_label, limit_str, item.cur_consumption, + "td>\n", + item.type, item.label, limit_str, item.cur_consumption, current_consumption_normalize, item.peak_consumption, peak_consumption_normalize); } (*output) << "\n"; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 671e07d7556b99..878cea8fb53c29 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -161,9 +161,8 @@ MemTable::~MemTable() { std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete()); _insert_mem_tracker->release(_mem_usage); _flush_mem_tracker->set_consumption(0); - DCHECK_EQ(_insert_mem_tracker->consumption(), 0) - << std::endl - << MemTracker::log_usage(_insert_mem_tracker->make_snapshot()); + DCHECK_EQ(_insert_mem_tracker->consumption(), 0) << std::endl + << _insert_mem_tracker->log_usage(); DCHECK_EQ(_flush_mem_tracker->consumption(), 0); _arena.reset(); _agg_buffer_pool.clear(); diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 23b760284b8985..ea045b1e53e30a 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -62,10 +62,7 @@ Status MemTableMemoryLimiter::init(int64_t process_mem_limit) { _load_hard_mem_limit * config::load_process_safe_mem_permit_percent / 100; g_load_hard_mem_limit.set_value(_load_hard_mem_limit); g_load_soft_mem_limit.set_value(_load_soft_mem_limit); - _memtable_tracker_set = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "MemTableTrackerSet"); - _mem_tracker = std::make_unique("AllMemTableMemory", - ExecEnv::GetInstance()->details_mem_tracker_set()); + _mem_tracker = std::make_unique("AllMemTableMemory"); REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption, [this]() { return _mem_tracker->consumption(); }); _log_timer.start(); diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index 2e8271bab35c15..66f5fb2a8d0c20 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -20,7 +20,7 @@ #include #include "common/status.h" -#include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/memory/mem_tracker.h" #include "util/countdown_latch.h" #include "util/stopwatch.hpp" @@ -45,7 +45,6 @@ class MemTableMemoryLimiter { void refresh_mem_tracker(); - MemTrackerLimiter* memtable_tracker_set() { return _memtable_tracker_set.get(); } MemTracker* mem_tracker() { return _mem_tracker.get(); } int64_t mem_usage() const { return _mem_usage; } @@ -68,8 +67,6 @@ class MemTableMemoryLimiter { int64_t _write_mem_usage = 0; int64_t _active_mem_usage = 0; - // mem tracker collection of all mem tables. - std::shared_ptr _memtable_tracker_set; // sum of all mem table memory. std::unique_ptr _mem_tracker; int64_t _load_hard_mem_limit = -1; diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 114a7841b92204..59916d5f1cc57b 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -187,25 +187,12 @@ Status MemTableWriter::wait_flush() { } void MemTableWriter::_reset_mem_table() { -#ifndef BE_TEST - auto mem_table_insert_tracker = std::make_shared( - fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num, - UniqueId(_req.load_id).to_string()), - ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set()); - auto mem_table_flush_tracker = std::make_shared( - fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num++, - UniqueId(_req.load_id).to_string()), - ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set()); -#else auto mem_table_insert_tracker = std::make_shared(fmt::format( "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num, UniqueId(_req.load_id).to_string())); auto mem_table_flush_tracker = std::make_shared(fmt::format( "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num++, UniqueId(_req.load_id).to_string())); -#endif { std::lock_guard l(_mem_table_tracker_lock); _mem_table_insert_trackers.push_back(mem_table_insert_tracker); diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index 09fc689959ce4c..32b6683e7823b0 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -92,28 +92,28 @@ class StoragePageCache { } }; - class DataPageCache : public LRUCachePolicyTrackingAllocator { + class DataPageCache : public LRUCachePolicy { public: DataPageCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingAllocator( - CachePolicy::CacheType::DATA_PAGE_CACHE, capacity, LRUCacheType::SIZE, - config::data_page_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::DATA_PAGE_CACHE, capacity, + LRUCacheType::SIZE, config::data_page_cache_stale_sweep_time_sec, + num_shards) {} }; - class IndexPageCache : public LRUCachePolicyTrackingAllocator { + class IndexPageCache : public LRUCachePolicy { public: IndexPageCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingAllocator( - CachePolicy::CacheType::INDEXPAGE_CACHE, capacity, LRUCacheType::SIZE, - config::index_page_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::INDEXPAGE_CACHE, capacity, + LRUCacheType::SIZE, config::index_page_cache_stale_sweep_time_sec, + num_shards) {} }; - class PKIndexPageCache : public LRUCachePolicyTrackingAllocator { + class PKIndexPageCache : public LRUCachePolicy { public: PKIndexPageCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingAllocator( - CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity, LRUCacheType::SIZE, - config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity, + LRUCacheType::SIZE, + config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {} }; static constexpr uint32_t kDefaultNumShards = 16; @@ -164,7 +164,7 @@ class StoragePageCache { // delete bitmap in unique key with mow std::unique_ptr _pk_index_page_cache; - LRUCachePolicyTrackingAllocator* _get_page_cache(segment_v2::PageTypePB page_type) { + LRUCachePolicy* _get_page_cache(segment_v2::PageTypePB page_type) { switch (page_type) { case segment_v2::DATA_PAGE: { return _data_page_cache.get(); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index b2930d2867b05f..e42c02860f5d00 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -135,9 +135,9 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptrgetSizeInBytes(), - bitmap->getSizeInBytes(), CachePriority::NORMAL); + auto* lru_handle = LRUCachePolicy::insert(key.encode(), (void*)cache_value_ptr.release(), + bitmap->getSizeInBytes(), bitmap->getSizeInBytes(), + CachePriority::NORMAL); *handle = InvertedIndexQueryCacheHandle(this, lru_handle); } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h index 5423ea044a2e58..b80f2c01027b6e 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h @@ -99,23 +99,23 @@ class InvertedIndexSearcherCache { private: InvertedIndexSearcherCache() = default; - class InvertedIndexSearcherCachePolicy : public LRUCachePolicyTrackingManual { + class InvertedIndexSearcherCachePolicy : public LRUCachePolicy { public: InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, uint32_t element_count_capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, - capacity, LRUCacheType::SIZE, - config::inverted_index_cache_stale_sweep_time_sec, - num_shards, element_count_capacity, true) {} + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, true) {} InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards, uint32_t element_count_capacity, CacheValueTimeExtractor cache_value_time_extractor, bool cache_value_check_timestamp) - : LRUCachePolicyTrackingManual( - CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, - LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec, - num_shards, element_count_capacity, cache_value_time_extractor, - cache_value_check_timestamp, true) {} + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity, + LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards, + element_count_capacity, cache_value_time_extractor, + cache_value_check_timestamp, true) {} }; // Insert a cache entry by key. // And the cache entry will be returned in handle. @@ -179,9 +179,9 @@ class InvertedIndexCacheHandle { class InvertedIndexQueryCacheHandle; -class InvertedIndexQueryCache : public LRUCachePolicyTrackingManual { +class InvertedIndexQueryCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; // cache key struct CacheKey { @@ -227,10 +227,9 @@ class InvertedIndexQueryCache : public LRUCachePolicyTrackingManual { InvertedIndexQueryCache() = delete; InvertedIndexQueryCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, - capacity, LRUCacheType::SIZE, - config::inverted_index_cache_stale_sweep_time_sec, - num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, capacity, + LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec, + num_shards) {} bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle); diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h index 7bb18a59c349a0..68cd809ed226f4 100644 --- a/be/src/olap/schema_cache.h +++ b/be/src/olap/schema_cache.h @@ -44,7 +44,7 @@ using SegmentIteratorUPtr = std::unique_ptr; // eliminating the need for frequent allocation and deallocation during usage. // This caching mechanism proves immensely advantageous, particularly in scenarios // with high concurrency, where queries are executed simultaneously. -class SchemaCache : public LRUCachePolicyTrackingManual { +class SchemaCache : public LRUCachePolicy { public: static SchemaCache* instance(); @@ -86,9 +86,8 @@ class SchemaCache : public LRUCachePolicyTrackingManual { }; SchemaCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::SCHEMA_CACHE, capacity, - LRUCacheType::NUMBER, - config::schema_cache_sweep_time_sec) {} + : LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity, LRUCacheType::NUMBER, + config::schema_cache_sweep_time_sec) {} private: static constexpr char SCHEMA_DELIMITER = '-'; diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 12ab89af0be283..fd7e3f476ad082 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -40,9 +40,9 @@ bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::CacheValue& value, SegmentCacheHandle* handle) { - auto* lru_handle = LRUCachePolicyTrackingManual::insert( - key.encode(), &value, value.segment->meta_mem_usage(), value.segment->meta_mem_usage(), - CachePriority::NORMAL); + auto* lru_handle = + LRUCachePolicy::insert(key.encode(), &value, value.segment->meta_mem_usage(), + value.segment->meta_mem_usage(), CachePriority::NORMAL); handle->push_segment(this, lru_handle); } diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index 5bb8fae3c41877..d177024242db33 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -55,9 +55,9 @@ class BetaRowset; // Make sure that cache_handle is valid during the segment usage period. using BetaRowsetSharedPtr = std::shared_ptr; -class SegmentCache : public LRUCachePolicyTrackingManual { +class SegmentCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; // The cache key or segment lru cache struct CacheKey { CacheKey(RowsetId rowset_id_, int64_t segment_id_) @@ -81,10 +81,9 @@ class SegmentCache : public LRUCachePolicyTrackingManual { }; SegmentCache(size_t memory_bytes_limit, size_t segment_num_limit) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::SEGMENT_CACHE, - memory_bytes_limit, LRUCacheType::SIZE, - config::tablet_rowset_stale_sweep_time_sec, - DEFAULT_LRU_CACHE_NUM_SHARDS * 2, segment_num_limit) {} + : LRUCachePolicy(CachePolicy::CacheType::SEGMENT_CACHE, memory_bytes_limit, + LRUCacheType::SIZE, config::tablet_rowset_stale_sweep_time_sec, + DEFAULT_LRU_CACHE_NUM_SHARDS * 2, segment_num_limit) {} // Lookup the given segment in the cache. // If the segment is found, the cache entry will be written into handle. diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index d7ccd4597d6ef3..b2a313adcdbb7e 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -540,7 +540,7 @@ class StorageEngine final : public BaseStorageEngine { // lru cache for create tabelt round robin in disks // key: partitionId_medium // value: index -class CreateTabletIdxCache : public LRUCachePolicyTrackingManual { +class CreateTabletIdxCache : public LRUCachePolicy { public: // get key, delimiter with DELIMITER '-' static std::string get_key(int64_t partition_id, TStorageMedium::type medium) { @@ -558,9 +558,9 @@ class CreateTabletIdxCache : public LRUCachePolicyTrackingManual { }; CreateTabletIdxCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, - capacity, LRUCacheType::NUMBER, - /*stale_sweep_time_s*/ 30 * 60) {} + : LRUCachePolicy(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, capacity, + LRUCacheType::NUMBER, + /*stale_sweep_time_s*/ 30 * 60) {} }; struct DirInfo { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 801a288fde8bfe..e7679da060361a 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -90,8 +90,7 @@ bvar::Adder g_tablet_meta_schema_columns_count("tablet_meta_schema_colu TabletManager::TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size) : _engine(engine), - _tablet_meta_mem_tracker(std::make_shared( - "TabletMeta(experimental)", ExecEnv::GetInstance()->details_mem_tracker_set())), + _tablet_meta_mem_tracker(std::make_shared("TabletMeta(experimental)")), _tablets_shards_size(tablet_map_lock_shard_size), _tablets_shards_mask(tablet_map_lock_shard_size - 1) { CHECK_GT(_tablets_shards_size, 0); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 74ab71d0586fa0..f754f885abe639 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -520,13 +520,12 @@ class DeleteBitmap { void remove_sentinel_marks(); - class AggCachePolicy : public LRUCachePolicyTrackingManual { + class AggCachePolicy : public LRUCachePolicy { public: AggCachePolicy(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE, - capacity, LRUCacheType::SIZE, - config::delete_bitmap_agg_cache_stale_sweep_time_sec, - 256) {} + : LRUCachePolicy(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE, capacity, + LRUCacheType::SIZE, + config::delete_bitmap_agg_cache_stale_sweep_time_sec, 256) {} }; class AggCache { diff --git a/be/src/olap/tablet_schema_cache.cpp b/be/src/olap/tablet_schema_cache.cpp index 51618f590a7dd2..e339c947bb97a4 100644 --- a/be/src/olap/tablet_schema_cache.cpp +++ b/be/src/olap/tablet_schema_cache.cpp @@ -40,8 +40,8 @@ std::pair TabletSchemaCache::insert(const std: pb.ParseFromString(key); tablet_schema_ptr->init_from_pb(pb); value->tablet_schema = tablet_schema_ptr; - lru_handle = LRUCachePolicyTrackingManual::insert( - key, value, tablet_schema_ptr->num_columns(), 0, CachePriority::NORMAL); + lru_handle = LRUCachePolicy::insert(key, value, tablet_schema_ptr->num_columns(), 0, + CachePriority::NORMAL); g_tablet_schema_cache_count << 1; g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns(); } diff --git a/be/src/olap/tablet_schema_cache.h b/be/src/olap/tablet_schema_cache.h index 10462804ed2012..e18892a3ca5f06 100644 --- a/be/src/olap/tablet_schema_cache.h +++ b/be/src/olap/tablet_schema_cache.h @@ -23,14 +23,13 @@ namespace doris { -class TabletSchemaCache : public LRUCachePolicyTrackingManual { +class TabletSchemaCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; TabletSchemaCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_SCHEMA_CACHE, capacity, - LRUCacheType::NUMBER, - config::tablet_schema_cache_recycle_interval) {} + : LRUCachePolicy(CachePolicy::CacheType::TABLET_SCHEMA_CACHE, capacity, + LRUCacheType::NUMBER, config::tablet_schema_cache_recycle_interval) {} static TabletSchemaCache* create_global_schema_cache(size_t capacity) { auto* res = new TabletSchemaCache(capacity); diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 5944bbf0fc3136..88ee97c5f6a3b9 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -282,13 +282,12 @@ class TxnManager { void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); - class TabletVersionCache : public LRUCachePolicyTrackingManual { + class TabletVersionCache : public LRUCachePolicy { public: TabletVersionCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_VERSION_CACHE, - capacity, LRUCacheType::NUMBER, -1, - DEFAULT_LRU_CACHE_NUM_SHARDS, - DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + : LRUCachePolicy(CachePolicy::CacheType::TABLET_VERSION_CACHE, capacity, + LRUCacheType::NUMBER, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} }; private: diff --git a/be/src/pipeline/query_cache/query_cache.cpp b/be/src/pipeline/query_cache/query_cache.cpp index e6d41ecaba5893..20e342e140f156 100644 --- a/be/src/pipeline/query_cache/query_cache.cpp +++ b/be/src/pipeline/query_cache/query_cache.cpp @@ -49,9 +49,8 @@ void QueryCache::insert(const CacheKey& key, int64_t version, CacheResult& res, auto cache_value_ptr = std::make_unique(version, std::move(cache_result), slot_orders); - QueryCacheHandle(this, LRUCachePolicyTrackingManual::insert( - key, (void*)cache_value_ptr.release(), cache_size, cache_size, - CachePriority::NORMAL)); + QueryCacheHandle(this, LRUCachePolicy::insert(key, (void*)cache_value_ptr.release(), cache_size, + cache_size, CachePriority::NORMAL)); } bool QueryCache::lookup(const CacheKey& key, int64_t version, doris::QueryCacheHandle* handle) { diff --git a/be/src/pipeline/query_cache/query_cache.h b/be/src/pipeline/query_cache/query_cache.h index 6ec00b91f7816c..a905831b530578 100644 --- a/be/src/pipeline/query_cache/query_cache.h +++ b/be/src/pipeline/query_cache/query_cache.h @@ -86,9 +86,9 @@ class QueryCacheHandle { DISALLOW_COPY_AND_ASSIGN(QueryCacheHandle); }; -class QueryCache : public LRUCachePolicyTrackingManual { +class QueryCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; struct CacheValue : public LRUCacheValueBase { int64_t version; @@ -140,8 +140,8 @@ class QueryCache : public LRUCachePolicyTrackingManual { QueryCache() = delete; QueryCache(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::QUERY_CACHE, capacity, - LRUCacheType::SIZE, 3600 * 24, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::QUERY_CACHE, capacity, LRUCacheType::SIZE, + 3600 * 24, num_shards) {} bool lookup(const CacheKey& key, int64_t version, QueryCacheHandle* handle); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 38fcaceb47953a..61cebad10b9e78 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -78,6 +78,7 @@ class LoadPathMgr; class NewLoadStreamMgr; class MemTrackerLimiter; class MemTracker; +struct TrackerLimiterGroup; class BaseStorageEngine; class ResultBufferMgr; class ResultQueueMgr; @@ -174,9 +175,10 @@ class ExecEnv { std::vector mem_tracker_limiter_pool; void init_mem_tracker(); std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } - MemTrackerLimiter* details_mem_tracker_set() { return _details_mem_tracker_set.get(); } std::shared_ptr page_no_cache_mem_tracker() { return _page_no_cache_mem_tracker; } - MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); } + std::shared_ptr brpc_iobuf_block_memory_tracker() { + return _brpc_iobuf_block_memory_tracker; + } std::shared_ptr segcompaction_mem_tracker() { return _segcompaction_mem_tracker; } @@ -359,10 +361,9 @@ class ExecEnv { // Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; - std::shared_ptr _details_mem_tracker_set; // page size not in cache, data page/index page/etc. std::shared_ptr _page_no_cache_mem_tracker; - std::shared_ptr _brpc_iobuf_block_memory_tracker; + std::shared_ptr _brpc_iobuf_block_memory_tracker; // Count the memory consumption of segment compaction tasks. std::shared_ptr _segcompaction_mem_tracker; std::shared_ptr _stream_load_pipe_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a69709d24d2466..758a2f3760c7a7 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -591,19 +591,16 @@ void ExecEnv::init_mem_tracker() { _s_tracking_memory = true; _orphan_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan"); - _details_mem_tracker_set = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "DetailsTrackerSet"); - _page_no_cache_mem_tracker = - std::make_shared("PageNoCache", _details_mem_tracker_set.get()); + _page_no_cache_mem_tracker = std::make_shared("PageNoCache"); _brpc_iobuf_block_memory_tracker = - std::make_shared("IOBufBlockMemory", _details_mem_tracker_set.get()); + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "IOBufBlockMemory"); _segcompaction_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SegCompaction"); _point_query_executor_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor"); _query_cache_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "QueryCache"); - _block_compression_mem_tracker = _block_compression_mem_tracker = + _block_compression_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "BlockCompression"); _rowid_storage_reader_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader"); diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 6d17b6f275f21d..ec841047c951cc 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -82,13 +82,12 @@ class LoadChannelMgr { Status _start_bg_worker(); - class LastSuccessChannelCache : public LRUCachePolicyTrackingManual { + class LastSuccessChannelCache : public LRUCachePolicy { public: LastSuccessChannelCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, - capacity, LRUCacheType::SIZE, -1, - DEFAULT_LRU_CACHE_NUM_SHARDS, - DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} + : LRUCachePolicy(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, capacity, + LRUCacheType::SIZE, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, + DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} }; protected: diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index 82b69ca02ef9f3..45d7781786f2d7 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -23,9 +23,6 @@ namespace doris { -std::mutex GlobalMemoryArbitrator::_reserved_trackers_lock; -std::unordered_map GlobalMemoryArbitrator::_reserved_trackers; - bvar::PassiveStatus g_vm_rss_sub_allocator_cache( "meminfo_vm_rss_sub_allocator_cache", [](void*) { return GlobalMemoryArbitrator::vm_rss_sub_allocator_cache(); }, nullptr); @@ -62,28 +59,11 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { } } while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem, std::memory_order_relaxed)); - { - std::lock_guard l(_reserved_trackers_lock); - _reserved_trackers[doris::thread_context()->thread_mem_tracker()->label()].add(bytes); - } return true; } void GlobalMemoryArbitrator::release_process_reserved_memory(int64_t bytes) { _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed); - { - std::lock_guard l(_reserved_trackers_lock); - auto label = doris::thread_context()->thread_mem_tracker()->label(); - auto it = _reserved_trackers.find(label); - if (it == _reserved_trackers.end()) { - DCHECK(false) << "release unknown reserved memory " << label << ", bytes: " << bytes; - return; - } - _reserved_trackers[label].sub(bytes); - if (_reserved_trackers[label].current_value() == 0) { - _reserved_trackers.erase(it); - } - } } int64_t GlobalMemoryArbitrator::sub_thread_reserve_memory(int64_t bytes) { diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index f804452956786d..1859f45391fca3 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -17,7 +17,7 @@ #pragma once -#include "runtime/memory/mem_tracker.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "util/mem_info.h" namespace doris { @@ -106,20 +106,6 @@ class GlobalMemoryArbitrator { static bool try_reserve_process_memory(int64_t bytes); static void release_process_reserved_memory(int64_t bytes); - static inline void make_reserved_memory_snapshots( - std::vector* snapshots) { - std::lock_guard l(_reserved_trackers_lock); - for (const auto& pair : _reserved_trackers) { - MemTracker::Snapshot snapshot; - snapshot.type = "reserved_memory"; - snapshot.label = pair.first; - snapshot.limit = -1; - snapshot.cur_consumption = pair.second.current_value(); - snapshot.peak_consumption = pair.second.peak_value(); - (*snapshots).emplace_back(snapshot); - } - } - static inline int64_t process_reserved_memory() { return _s_process_reserved_memory.load(std::memory_order_relaxed); } @@ -207,9 +193,6 @@ class GlobalMemoryArbitrator { private: static std::atomic _s_process_reserved_memory; - - static std::mutex _reserved_trackers_lock; - static std::unordered_map _reserved_trackers; }; } // namespace doris diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index 419825c85c4538..7b5a8ab9fec6d9 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -47,6 +47,7 @@ class LRUCachePolicy : public CachePolicy { CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); } + _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type, @@ -65,6 +66,7 @@ class LRUCachePolicy : public CachePolicy { CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); } + _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } void reset_cache() { _cache.reset(); } @@ -92,11 +94,33 @@ class LRUCachePolicy : public CachePolicy { } } - virtual int64_t mem_consumption() = 0; + std::shared_ptr mem_tracker() const { + DCHECK(_mem_tracker != nullptr); + return _mem_tracker; + } - virtual Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, - size_t tracking_bytes, - CachePriority priority = CachePriority::NORMAL) = 0; + int64_t mem_consumption() { + DCHECK(_mem_tracker != nullptr); + return _mem_tracker->consumption(); + } + + // Insert will consume tracking_bytes to _mem_tracker and cache value destroy will release tracking_bytes. + // If LRUCacheType::SIZE, tracking_bytes usually equal to charge. + // If LRUCacheType::NUMBER, tracking_bytes usually not equal to charge, at this time charge is an weight. + // If LRUCacheType::SIZE and tracking_bytes equals 0, memory must be tracked in Doris Allocator, + // cache value is allocated using Alloctor. + // If LRUCacheType::NUMBER and tracking_bytes equals 0, usually currently cannot accurately tracking memory size, + // only tracking handle_size(106). + Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, size_t tracking_bytes, + CachePriority priority = CachePriority::NORMAL) { + size_t tracking_bytes_with_handle = sizeof(LRUHandle) - 1 + key.size() + tracking_bytes; + if (value != nullptr) { + mem_tracker()->consume(tracking_bytes_with_handle); + ((LRUCacheValueBase*)value) + ->set_tracking_bytes(tracking_bytes_with_handle, _mem_tracker); + } + return _cache->insert(key, value, charge, priority); + } Cache::Handle* lookup(const CacheKey& key) { return _cache->lookup(key); } @@ -238,128 +262,19 @@ class LRUCachePolicy : public CachePolicy { } protected: + void _init_mem_tracker(const std::string& type_name) { + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::GLOBAL, + fmt::format("{}[{}]", type_string(_type), type_name)); + } + // if check_capacity failed, will return dummy lru cache, // compatible with ShardedLRUCache usage, but will not actually cache. std::shared_ptr _cache; std::mutex _lock; LRUCacheType _lru_cache_type; -}; - -class LRUCachePolicyTrackingAllocator : public LRUCachePolicy { -public: - LRUCachePolicyTrackingAllocator( - CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS, - uint32_t element_count_capacity = DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, - bool enable_prune = true) - : LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards, - element_count_capacity, enable_prune) { - _init_mem_tracker(lru_cache_type_string(lru_cache_type)); - } - - LRUCachePolicyTrackingAllocator(CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards, - uint32_t element_count_capacity, - CacheValueTimeExtractor cache_value_time_extractor, - bool cache_value_check_timestamp, bool enable_prune = true) - : LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards, - element_count_capacity, cache_value_time_extractor, - cache_value_check_timestamp, enable_prune) { - _init_mem_tracker(lru_cache_type_string(lru_cache_type)); - } - - ~LRUCachePolicyTrackingAllocator() override { reset_cache(); } - - std::shared_ptr mem_tracker() const { - DCHECK(_mem_tracker != nullptr); - return _mem_tracker; - } - - int64_t mem_consumption() override { - DCHECK(_mem_tracker != nullptr); - return _mem_tracker->consumption(); - } - - Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, size_t tracking_bytes, - CachePriority priority = CachePriority::NORMAL) override { - return _cache->insert(key, value, charge, priority); - } - -protected: - void _init_mem_tracker(const std::string& type_name) { - _mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::GLOBAL, - fmt::format("{}[{}](AllocByAllocator)", type_string(_type), type_name)); - } std::shared_ptr _mem_tracker; }; -class LRUCachePolicyTrackingManual : public LRUCachePolicy { -public: - LRUCachePolicyTrackingManual( - CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS, - uint32_t element_count_capacity = DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, - bool enable_prune = true) - : LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards, - element_count_capacity, enable_prune) { - _init_mem_tracker(lru_cache_type_string(lru_cache_type)); - } - - LRUCachePolicyTrackingManual(CacheType type, size_t capacity, LRUCacheType lru_cache_type, - uint32_t stale_sweep_time_s, uint32_t num_shards, - uint32_t element_count_capacity, - CacheValueTimeExtractor cache_value_time_extractor, - bool cache_value_check_timestamp, bool enable_prune = true) - : LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards, - element_count_capacity, cache_value_time_extractor, - cache_value_check_timestamp, enable_prune) { - _init_mem_tracker(lru_cache_type_string(lru_cache_type)); - } - - ~LRUCachePolicyTrackingManual() override { reset_cache(); } - - MemTracker* mem_tracker() { - DCHECK(_mem_tracker != nullptr); - return _mem_tracker.get(); - } - - int64_t mem_consumption() override { - DCHECK(_mem_tracker != nullptr); - return _mem_tracker->consumption(); - } - - // Insert and cache value destroy will be manually consume tracking_bytes to mem tracker. - // If lru cache is LRUCacheType::SIZE, tracking_bytes usually equal to charge. - Cache::Handle* insert(const CacheKey& key, void* value, size_t charge, size_t tracking_bytes, - CachePriority priority = CachePriority::NORMAL) override { - size_t bytes_with_handle = _get_bytes_with_handle(key, charge, tracking_bytes); - if (value != nullptr) { // if tracking_bytes = 0, only tracking handle size. - mem_tracker()->consume(bytes_with_handle); - ((LRUCacheValueBase*)value)->set_tracking_bytes(bytes_with_handle, mem_tracker()); - } - return _cache->insert(key, value, charge, priority); - } - -private: - void _init_mem_tracker(const std::string& type_name) { - _mem_tracker = - std::make_unique(fmt::format("{}[{}]", type_string(_type), type_name), - ExecEnv::GetInstance()->details_mem_tracker_set()); - } - - // LRUCacheType::SIZE equal to total_size. - size_t _get_bytes_with_handle(const CacheKey& key, size_t charge, size_t bytes) { - size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); - DCHECK(_lru_cache_type == LRUCacheType::SIZE || bytes != -1) - << " _type " << type_string(_type); - // if LRUCacheType::NUMBER and bytes equals 0, such as some caches cannot accurately track memory size. - // cache mem tracker value and _usage divided by handle_size(106) will get the number of cache entries. - return _lru_cache_type == LRUCacheType::SIZE ? handle_size + charge : handle_size + bytes; - } - - std::unique_ptr _mem_tracker; -}; - } // namespace doris diff --git a/be/src/runtime/memory/lru_cache_value_base.h b/be/src/runtime/memory/lru_cache_value_base.h index 6d4b2991a023a6..f9e534e6600df8 100644 --- a/be/src/runtime/memory/lru_cache_value_base.h +++ b/be/src/runtime/memory/lru_cache_value_base.h @@ -27,18 +27,19 @@ class LRUCacheValueBase { public: virtual ~LRUCacheValueBase() { if (_tracking_bytes > 0) { - _mem_tracker->consume(-_tracking_bytes); + _mem_tracker->release(_tracking_bytes); } } - void set_tracking_bytes(size_t tracking_bytes, MemTracker* mem_tracker) { + void set_tracking_bytes(size_t tracking_bytes, + const std::shared_ptr& mem_tracker) { this->_tracking_bytes = tracking_bytes; this->_mem_tracker = mem_tracker; } protected: size_t _tracking_bytes = 0; - MemTracker* _mem_tracker = nullptr; + std::shared_ptr _mem_tracker; }; } // namespace doris diff --git a/be/src/runtime/memory/mem_counter.h b/be/src/runtime/memory/mem_counter.h new file mode 100644 index 00000000000000..8964a5dc63f732 --- /dev/null +++ b/be/src/runtime/memory/mem_counter.h @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +#pragma once + +#include +#include +#include + +#include "common/compiler_util.h" +#include "util/pretty_printer.h" + +namespace doris { + +/* + * A counter that keeps track of the current and peak memory usage seen. + * Relaxed ordering, not accurate in real time. + * + * This class is thread-safe. +*/ +class MemCounter { +public: + MemCounter() = default; + + void add(int64_t delta) { + if (UNLIKELY(delta == 0)) { + return; + } + int64_t value = _current_value.fetch_add(delta, std::memory_order_relaxed) + delta; + update_peak(value); + } + + void add_no_update_peak(int64_t delta) { // need extreme fast + _current_value.fetch_add(delta, std::memory_order_relaxed); + } + + bool try_add(int64_t delta, int64_t max) { + if (UNLIKELY(delta == 0)) { + return true; + } + int64_t cur_val = _current_value.load(std::memory_order_relaxed); + int64_t new_val = 0; + do { + new_val = cur_val + delta; + if (UNLIKELY(new_val > max)) { + return false; + } + } while (UNLIKELY(!_current_value.compare_exchange_weak(cur_val, new_val, + std::memory_order_relaxed))); + update_peak(new_val); + return true; + } + + void sub(int64_t delta) { _current_value.fetch_sub(delta, std::memory_order_relaxed); } + + void set(int64_t v) { + _current_value.store(v, std::memory_order_relaxed); + update_peak(v); + } + + void update_peak(int64_t value) { + int64_t pre_value = _peak_value.load(std::memory_order_relaxed); + while (value > pre_value && + !_peak_value.compare_exchange_weak(pre_value, value, std::memory_order_relaxed)) { + } + } + + int64_t current_value() const { return _current_value.load(std::memory_order_relaxed); } + int64_t peak_value() const { return _peak_value.load(std::memory_order_relaxed); } + + static std::string print_bytes(int64_t bytes) { + return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES) + : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES); + } + +private: + std::atomic _current_value {0}; + std::atomic _peak_value {0}; +}; + +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index f5a3853f79f84d..796e6c166e04fe 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -15,100 +15,39 @@ // specific language governing permissions and limitations // under the License. // This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.cpp -// and modified by Doris #include "runtime/memory/mem_tracker.h" -#include - -#include - -#include "bvar/bvar.h" -#include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/thread_context.h" +#include namespace doris { +constexpr size_t MEM_TRACKERS_GROUP_NUM = 1000; +std::atomic mem_tracker_group_counter(0); bvar::Adder g_memtracker_cnt("memtracker_cnt"); -// Save all MemTrackers in use to maintain the weak relationship between MemTracker and MemTrackerLimiter. -// When MemTrackerLimiter prints statistics, all MemTracker statistics with weak relationship will be printed together. -// Each group corresponds to several MemTrackerLimiters and has a lock. -// Multiple groups are used to reduce the impact of locks. -std::vector MemTracker::mem_tracker_pool(1000); +std::vector MemTracker::mem_tracker_pool(MEM_TRACKERS_GROUP_NUM); -MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : _label(label) { - _consumption = std::make_shared(); - bind_parent(parent); -} - -void MemTracker::bind_parent(MemTrackerLimiter* parent) { - if (parent) { - _type = parent->type(); - _parent_label = parent->label(); - _parent_group_num = parent->group_num(); - } else { - _type = thread_context()->thread_mem_tracker()->type(); - _parent_label = thread_context()->thread_mem_tracker()->label(); - _parent_group_num = thread_context()->thread_mem_tracker()->group_num(); - } +MemTracker::MemTracker(const std::string& label) { + _label = label; + _group_num = mem_tracker_group_counter.fetch_add(1) % MEM_TRACKERS_GROUP_NUM; { - std::lock_guard l(mem_tracker_pool[_parent_group_num].group_lock); - _tracker_group_it = mem_tracker_pool[_parent_group_num].trackers.insert( - mem_tracker_pool[_parent_group_num].trackers.end(), this); + std::lock_guard l(mem_tracker_pool[_group_num].group_lock); + _trackers_group_it = mem_tracker_pool[_group_num].trackers.insert( + mem_tracker_pool[_group_num].trackers.end(), this); } g_memtracker_cnt << 1; } MemTracker::~MemTracker() { - if (_parent_group_num != -1) { - std::lock_guard l(mem_tracker_pool[_parent_group_num].group_lock); - if (_tracker_group_it != mem_tracker_pool[_parent_group_num].trackers.end()) { - mem_tracker_pool[_parent_group_num].trackers.erase(_tracker_group_it); - _tracker_group_it = mem_tracker_pool[_parent_group_num].trackers.end(); + if (_group_num != -1) { + std::lock_guard l(mem_tracker_pool[_group_num].group_lock); + if (_trackers_group_it != mem_tracker_pool[_group_num].trackers.end()) { + mem_tracker_pool[_group_num].trackers.erase(_trackers_group_it); + _trackers_group_it = mem_tracker_pool[_group_num].trackers.end(); } g_memtracker_cnt << -1; } } -MemTracker::Snapshot MemTracker::make_snapshot() const { - Snapshot snapshot; - snapshot.type = type_string(_type); - snapshot.label = _label; - snapshot.parent_label = _parent_label; - snapshot.limit = -1; - snapshot.cur_consumption = _consumption->current_value(); - snapshot.peak_consumption = _consumption->peak_value(); - return snapshot; -} - -void MemTracker::make_group_snapshot(std::vector* snapshots, - int64_t group_num, std::string parent_label) { - std::lock_guard l(mem_tracker_pool[group_num].group_lock); - for (auto* tracker : mem_tracker_pool[group_num].trackers) { - if (tracker->parent_label() == parent_label && tracker->peak_consumption() != 0) { - snapshots->push_back(tracker->make_snapshot()); - } - } -} - -void MemTracker::make_all_trackers_snapshots(std::vector* snapshots) { - for (auto& i : mem_tracker_pool) { - std::lock_guard l(i.group_lock); - for (auto* tracker : i.trackers) { - if (tracker->peak_consumption() != 0) { - snapshots->push_back(tracker->make_snapshot()); - } - } - } -} - -std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) { - return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", - snapshot.label, snapshot.parent_label, print_bytes(snapshot.cur_consumption), - snapshot.cur_consumption, print_bytes(snapshot.peak_consumption), - snapshot.peak_consumption); -} - } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 8a977e49388d52..9ea11fa86968ad 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -15,216 +15,59 @@ // specific language governing permissions and limitations // under the License. // This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.h -// and modified by Doris #pragma once -#include -#include - -#include -// IWYU pragma: no_include -#include // IWYU pragma: keep -#include -#include -#include +#include #include -#include -#include "common/compiler_util.h" // IWYU pragma: keep -#include "runtime/query_statistics.h" -#include "util/pretty_printer.h" +#include "runtime/memory/mem_counter.h" namespace doris { -class MemTrackerLimiter; - -// Used to track memory usage. -// -// MemTracker can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER, -// which will automatically track all memory usage of the code segment where it is located. -// -// This class is thread-safe. -class MemTracker { +/* + * can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER, + * which will automatically track all memory usage of the code segment where it is located. + * + * This class is thread-safe. +*/ +class MemTracker final { public: - struct Snapshot { - std::string type; - std::string label; - std::string parent_label; - int64_t limit = 0; - int64_t cur_consumption = 0; - int64_t peak_consumption = 0; - - bool operator<(const Snapshot& rhs) const { return cur_consumption < rhs.cur_consumption; } - }; - - struct TrackerGroup { - std::list trackers; - std::mutex group_lock; - }; - - enum class Type { - GLOBAL = 0, // Life cycle is the same as the process, e.g. Cache and default Orphan - QUERY = 1, // Count the memory consumption of all Query tasks. - LOAD = 2, // Count the memory consumption of all Load tasks. - COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. - SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. - OTHER = 5 - }; - - static std::string type_string(Type type) { - switch (type) { - case Type::GLOBAL: - return "global"; - case Type::QUERY: - return "query"; - case Type::LOAD: - return "load"; - case Type::COMPACTION: - return "compaction"; - case Type::SCHEMA_CHANGE: - return "schema_change"; - case Type::OTHER: - return "other"; - default: - LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast(type); - } - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); - } - - // A counter that keeps track of the current and peak value seen. - // Relaxed ordering, not accurate in real time. - class MemCounter { - public: - MemCounter() : _current_value(0), _peak_value(0) {} - - void add(int64_t delta) { - int64_t value = _current_value.fetch_add(delta, std::memory_order_relaxed) + delta; - update_peak(value); - } - - void add_no_update_peak(int64_t delta) { - _current_value.fetch_add(delta, std::memory_order_relaxed); - } - - bool try_add(int64_t delta, int64_t max) { - int64_t cur_val = _current_value.load(std::memory_order_relaxed); - int64_t new_val = 0; - do { - new_val = cur_val + delta; - if (UNLIKELY(new_val > max)) { - return false; - } - } while (UNLIKELY(!_current_value.compare_exchange_weak(cur_val, new_val, - std::memory_order_relaxed))); - update_peak(new_val); - return true; - } - - void sub(int64_t delta) { _current_value.fetch_sub(delta, std::memory_order_relaxed); } - - void set(int64_t v) { - _current_value.store(v, std::memory_order_relaxed); - update_peak(v); - } - - void update_peak(int64_t value) { - int64_t pre_value = _peak_value.load(std::memory_order_relaxed); - while (value > pre_value && !_peak_value.compare_exchange_weak( - pre_value, value, std::memory_order_relaxed)) { - } - } - - int64_t current_value() const { return _current_value.load(std::memory_order_relaxed); } - int64_t peak_value() const { return _peak_value.load(std::memory_order_relaxed); } - - private: - std::atomic _current_value; - std::atomic _peak_value; - }; - - // Creates and adds the tracker to the mem_tracker_pool. - MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr); + MemTracker() = default; + MemTracker(const std::string& label); + ~MemTracker(); - virtual ~MemTracker(); + void consume(int64_t bytes) { _mem_counter.add(bytes); } + void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); } + void release(int64_t bytes) { _mem_counter.sub(bytes); } + void set_consumption(int64_t bytes) { _mem_counter.set(bytes); } + int64_t consumption() const { return _mem_counter.current_value(); } + int64_t peak_consumption() const { return _mem_counter.peak_value(); } - static std::string print_bytes(int64_t bytes) { - return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES) - : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES); - } - -public: - Type type() const { return _type; } const std::string& label() const { return _label; } - const std::string& parent_label() const { return _parent_label; } - const std::string& set_parent_label() const { return _parent_label; } - // Returns the memory consumed in bytes. - int64_t consumption() const { return _consumption->current_value(); } - int64_t peak_consumption() const { return _consumption->peak_value(); } - - void consume(int64_t bytes) { - if (UNLIKELY(bytes == 0)) { - return; - } - _consumption->add(bytes); - if (_query_statistics) { - _query_statistics->set_max_peak_memory_bytes(_consumption->peak_value()); - _query_statistics->set_current_used_memory_bytes(_consumption->current_value()); - } - } - - void consume_no_update_peak(int64_t bytes) { // need extreme fast - _consumption->add_no_update_peak(bytes); - } - - void release(int64_t bytes) { _consumption->sub(bytes); } - - void set_consumption(int64_t bytes) { _consumption->set(bytes); } - - std::shared_ptr get_query_statistics() { return _query_statistics; } - -public: - virtual Snapshot make_snapshot() const; - // Specify group_num from mem_tracker_pool to generate snapshot. - static void make_group_snapshot(std::vector* snapshots, int64_t group_num, - std::string parent_label); - static void make_all_trackers_snapshots(std::vector* snapshots); - static std::string log_usage(MemTracker::Snapshot snapshot); - - virtual std::string debug_string() { - std::stringstream msg; - msg << "label: " << _label << "; " - << "consumption: " << consumption() << "; " - << "peak_consumption: " << peak_consumption() << "; "; - return msg.str(); + std::string log_usage() const { + return fmt::format("MemTracker Lame={}, Used={}({} B), Peak={}({} B)", + MemCounter::print_bytes(consumption()), consumption(), + MemCounter::print_bytes(peak_consumption()), peak_consumption()); } -protected: - // Only used by MemTrackerLimiter - MemTracker() { _parent_group_num = -1; } - - void bind_parent(MemTrackerLimiter* parent); - - Type _type; - - // label used in the make snapshot, not guaranteed unique. - std::string _label; - - std::shared_ptr _consumption = nullptr; - - // Tracker is located in group num in mem_tracker_pool - int64_t _parent_group_num = 0; - - // Use _parent_label to correlate with parent limiter tracker. - std::string _parent_label = "-"; - - static std::vector mem_tracker_pool; +private: + MemCounter _mem_counter; + std::string _label {"None"}; + /* + * Save all MemTrackers, used by dump memory info. + */ + struct TrackersGroup { + std::list trackers; + std::mutex group_lock; + }; + // Each group corresponds to several MemCountes and has a lock. + // Multiple groups are used to reduce the impact of locks. + static std::vector mem_tracker_pool; + // Group number in mem_tracker_pool, generated by the timestamp. + int64_t _group_num {-1}; // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. - std::list::iterator _tracker_group_it; - - std::shared_ptr _query_statistics = nullptr; + std::list::iterator _trackers_group_it; }; } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index a1eb2ed67d3426..78e66b6a579b79 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -19,16 +19,13 @@ #include #include -#include #include #include #include #include -#include "bvar/bvar.h" #include "common/config.h" -#include "olap/memtable_memory_limiter.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/memory/global_memory_arbitrator.h" @@ -37,9 +34,7 @@ #include "service/backend_options.h" #include "util/mem_info.h" #include "util/perf_counters.h" -#include "util/pretty_printer.h" #include "util/runtime_profile.h" -#include "util/stack_util.h" namespace doris { @@ -54,6 +49,7 @@ static bvar::Adder memory_schema_change_trackers_sum_bytes( "memory_schema_change_trackers_sum_bytes"); static bvar::Adder memory_other_trackers_sum_bytes("memory_other_trackers_sum_bytes"); +std::atomic mem_tracker_limiter_group_counter(0); constexpr auto GC_MAX_SEEK_TRACKER = 1000; std::atomic MemTrackerLimiter::_enable_print_log_process_usage {true}; @@ -76,14 +72,14 @@ static RuntimeProfile::Counter* previously_canceling_tasks_counter = MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit) { DCHECK_GE(byte_limit, -1); - _consumption = std::make_shared(); _type = type; _label = label; _limit = byte_limit; if (_type == Type::GLOBAL) { _group_num = 0; } else { - _group_num = random() % 999 + 1; + _group_num = + mem_tracker_limiter_group_counter.fetch_add(1) % (MEM_TRACKER_GROUP_NUM - 1) + 1; } // currently only select/load need runtime query statistics @@ -132,25 +128,23 @@ MemTrackerLimiter::~MemTrackerLimiter() { "tracker web or log, this indicates that there may be a memory leak. " "4. If you need to " "transfer memory tracking value between two trackers, can use transfer_to."; - if (_consumption->current_value() != 0) { + if (consumption() != 0) { if (open_memory_tracker_inaccurate_detect()) { - std::string err_msg = - fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", - label(), _consumption->current_value(), _consumption->peak_value(), - mem_tracker_inaccurate_msg); + std::string err_msg = fmt::format( + "mem tracker label: {}, consumption: {}, peak consumption: {}, {}.", label(), + consumption(), peak_consumption(), mem_tracker_inaccurate_msg); LOG(FATAL) << err_msg << print_address_sanitizers(); } if (ExecEnv::tracking_memory()) { - ExecEnv::GetInstance()->orphan_mem_tracker()->consume(_consumption->current_value()); + ExecEnv::GetInstance()->orphan_mem_tracker()->consume(consumption()); } - _consumption->set(0); - } else if (doris::config::crash_in_memory_tracker_inaccurate && !_address_sanitizers.empty() && - !is_group_commit_load) { + _mem_counter.set(0); + } else if (open_memory_tracker_inaccurate_detect() && !_address_sanitizers.empty()) { LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " << ", mem tracker label: " << _label - << ", peak consumption: " << _consumption->peak_value() - << print_address_sanitizers(); + << ", peak consumption: " << peak_consumption() << print_address_sanitizers(); } + DCHECK(reserved_consumption() == 0); memory_memtrackerlimiter_cnt << -1; } @@ -163,9 +157,9 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, " "consumption: {}, peak consumption: {}, buf: {}, size: {}, old " "buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.", - _label, _consumption->current_value(), _consumption->peak_value(), - buf, size, it->first, it->second.size, - get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); + _label, consumption(), peak_consumption(), buf, size, it->first, + it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + it->second.stack_trace)); } // if alignment not equal to 0, maybe usable_size > size. @@ -186,8 +180,8 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { "[Address Sanitizer] free memory buf size inaccurate, mem tracker label: " "{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: " "{}, old size: {}, new stack_trace: {}, old stack_trace: {}.", - _label, _consumption->current_value(), _consumption->peak_value(), buf, - size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), + _label, consumption(), peak_consumption(), buf, size, it->first, + it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace)); } _address_sanitizers.erase(buf); @@ -195,7 +189,7 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { _error_address_sanitizers.emplace_back(fmt::format( "[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: " "{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.", - _label, _consumption->current_value(), _consumption->peak_value(), buf, size, + _label, consumption(), peak_consumption(), buf, size, get_stack_trace(1, "FULL_WITH_INLINE"))); } } @@ -209,8 +203,8 @@ std::string MemTrackerLimiter::print_address_sanitizers() { auto msg = fmt::format( "\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: " "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}", - _label, _consumption->current_value(), _consumption->peak_value(), it.first, - it.second.size, it.second.stack_trace); + _label, consumption(), peak_consumption(), it.first, it.second.size, + it.second.stack_trace); LOG(INFO) << msg; detail += msg; } @@ -222,16 +216,38 @@ std::string MemTrackerLimiter::print_address_sanitizers() { return detail; } -MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const { +MemTrackerLimiter::Snapshot MemTrackerLimiter::make_snapshot() const { Snapshot snapshot; snapshot.type = type_string(_type); snapshot.label = _label; snapshot.limit = _limit; - snapshot.cur_consumption = _consumption->current_value(); - snapshot.peak_consumption = _consumption->peak_value(); + snapshot.cur_consumption = consumption(); + snapshot.peak_consumption = peak_consumption(); return snapshot; } +MemTrackerLimiter::Snapshot MemTrackerLimiter::make_reserved_trackers_snapshot() const { + Snapshot snapshot; + snapshot.type = "reserved_memory"; + snapshot.label = _label; + snapshot.limit = -1; + snapshot.cur_consumption = reserved_consumption(); + snapshot.peak_consumption = reserved_peak_consumption(); + return snapshot; +} + +void MemTrackerLimiter::make_all_reserved_trackers_snapshots(std::vector* snapshots) { + for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) { + std::lock_guard l(i.group_lock); + for (auto trackerWptr : i.trackers) { + auto tracker = trackerWptr.lock(); + if (tracker != nullptr && tracker->reserved_consumption() != 0) { + (*snapshots).emplace_back(tracker->make_reserved_trackers_snapshot()); + } + } + } +} + void MemTrackerLimiter::refresh_global_counter() { std::unordered_map type_mem_sum = { {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0}, @@ -248,7 +264,8 @@ void MemTrackerLimiter::refresh_global_counter() { } int64_t all_trackers_mem_sum = 0; for (auto it : type_mem_sum) { - MemTrackerLimiter::TypeMemSum[it.first]->set(it.second); + MemTrackerLimiter::TypeMemSum[it.first].set(it.second); + all_trackers_mem_sum += it.second; switch (it.first) { case Type::GLOBAL: @@ -300,18 +317,18 @@ void MemTrackerLimiter::clean_tracker_limiter_group() { #endif } -void MemTrackerLimiter::make_process_snapshots(std::vector* snapshots) { +void MemTrackerLimiter::make_process_snapshots(std::vector* snapshots) { MemTrackerLimiter::refresh_global_counter(); int64_t all_trackers_mem_sum = 0; Snapshot snapshot; - for (auto it : MemTrackerLimiter::TypeMemSum) { + for (const auto& it : MemTrackerLimiter::TypeMemSum) { snapshot.type = "overview"; snapshot.label = type_string(it.first); snapshot.limit = -1; - snapshot.cur_consumption = it.second->current_value(); - snapshot.peak_consumption = it.second->peak_value(); + snapshot.cur_consumption = it.second.current_value(); + snapshot.peak_consumption = it.second.peak_value(); (*snapshots).emplace_back(snapshot); - all_trackers_mem_sum += it.second->current_value(); + all_trackers_mem_sum += it.second.current_value(); } snapshot.type = "overview"; @@ -363,7 +380,7 @@ void MemTrackerLimiter::make_process_snapshots(std::vector (*snapshots).emplace_back(snapshot); } -void MemTrackerLimiter::make_type_snapshots(std::vector* snapshots, +void MemTrackerLimiter::make_type_snapshots(std::vector* snapshots, MemTrackerLimiter::Type type) { if (type == Type::GLOBAL) { std::lock_guard l( @@ -372,7 +389,6 @@ void MemTrackerLimiter::make_type_snapshots(std::vector* s auto tracker = trackerWptr.lock(); if (tracker != nullptr) { (*snapshots).emplace_back(tracker->make_snapshot()); - MemTracker::make_group_snapshot(snapshots, tracker->group_num(), tracker->label()); } } } else { @@ -383,17 +399,15 @@ void MemTrackerLimiter::make_type_snapshots(std::vector* s auto tracker = trackerWptr.lock(); if (tracker != nullptr && tracker->type() == type) { (*snapshots).emplace_back(tracker->make_snapshot()); - MemTracker::make_group_snapshot(snapshots, tracker->group_num(), - tracker->label()); } } } } } -void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots, +void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots, int top_num) { - std::priority_queue max_pq; + std::priority_queue max_pq; // not include global type. for (unsigned i = 1; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) { std::lock_guard l( @@ -413,7 +427,7 @@ void MemTrackerLimiter::make_top_consumption_snapshots(std::vector* snapshots) { +void MemTrackerLimiter::make_all_trackers_snapshots(std::vector* snapshots) { for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) { std::lock_guard l(i.group_lock); for (auto trackerWptr : i.trackers) { @@ -425,25 +439,25 @@ void MemTrackerLimiter::make_all_trackers_snapshots(std::vector* snapshots) { +void MemTrackerLimiter::make_all_memory_state_snapshots(std::vector* snapshots) { make_process_snapshots(snapshots); make_all_trackers_snapshots(snapshots); - MemTracker::make_all_trackers_snapshots(snapshots); + make_all_reserved_trackers_snapshots(snapshots); } -std::string MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) { - return fmt::format( - "MemTrackerLimiter Label={}, Type={}, Limit={}({} B), Used={}({} B), Peak={}({} B)", - snapshot.label, snapshot.type, print_bytes(snapshot.limit), snapshot.limit, - print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, - print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); +std::string MemTrackerLimiter::log_usage(Snapshot snapshot) { + return fmt::format("MemTracker Label={}, Type={}, Limit={}({} B), Used={}({} B), Peak={}({} B)", + snapshot.label, snapshot.type, MemCounter::print_bytes(snapshot.limit), + snapshot.limit, MemCounter::print_bytes(snapshot.cur_consumption), + snapshot.cur_consumption, MemCounter::print_bytes(snapshot.peak_consumption), + snapshot.peak_consumption); } -std::string MemTrackerLimiter::type_log_usage(MemTracker::Snapshot snapshot) { +std::string MemTrackerLimiter::type_log_usage(Snapshot snapshot) { return fmt::format("Type={}, Used={}({} B), Peak={}({} B)", snapshot.type, - print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, - print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); + MemCounter::print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, + MemCounter::print_bytes(snapshot.peak_consumption), + snapshot.peak_consumption); } std::string MemTrackerLimiter::type_detail_usage(const std::string& msg, Type type) { @@ -467,16 +481,6 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { std::string detail = msg; detail += "\nProcess Memory Summary:\n " + GlobalMemoryArbitrator::process_mem_log_str(); detail += "\nMemory Tracker Summary: " + log_usage(); - std::string child_trackers_usage; - std::vector snapshots; - MemTracker::make_group_snapshot(&snapshots, _group_num, _label); - for (const auto& snapshot : snapshots) { - child_trackers_usage += "\n " + MemTracker::log_usage(snapshot); - } - if (!child_trackers_usage.empty()) { - detail += child_trackers_usage; - } - LOG(WARNING) << detail; } } @@ -484,25 +488,24 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { std::string MemTrackerLimiter::log_process_usage_str() { std::string detail; detail += "\nProcess Memory Summary:\n " + GlobalMemoryArbitrator::process_mem_log_str(); - std::vector snapshots; + std::vector snapshots; MemTrackerLimiter::make_process_snapshots(&snapshots); MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); MemTrackerLimiter::make_top_consumption_snapshots(&snapshots, 15); - - // Add additional tracker printed when memory exceeds limit. - snapshots.emplace_back( - ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->make_snapshot()); + MemTrackerLimiter::make_all_reserved_trackers_snapshots(&snapshots); detail += "\nMemory Tracker Summary:"; for (const auto& snapshot : snapshots) { - if (snapshot.label.empty() && snapshot.parent_label.empty()) { + if (snapshot.label.empty()) { detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); - } else if (snapshot.parent_label.empty()) { - detail += "\n " + MemTrackerLimiter::log_usage(snapshot); } else { - detail += "\n " + MemTracker::log_usage(snapshot); + detail += "\n " + MemTrackerLimiter::log_usage(snapshot); } } + + // Add additional tracker printed when memory exceeds limit. + detail += "\n " + + ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->log_usage(); return detail; } @@ -518,8 +521,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() { std::string err_msg = fmt::format( "memory tracker limit exceeded, tracker label:{}, type:{}, limit " "{}, peak used {}, current used {}. backend {}, {}.", - label(), type_string(_type), print_bytes(limit()), - print_bytes(_consumption->peak_value()), print_bytes(_consumption->current_value()), + label(), type_string(_type), MemCounter::print_bytes(limit()), + MemCounter::print_bytes(peak_consumption()), MemCounter::print_bytes(consumption()), BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str()); if (_type == Type::QUERY || _type == Type::LOAD) { err_msg += fmt::format( @@ -544,7 +547,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, "Process memory not enough, cancel top memory used {}: " "<{}> consumption {}, backend {}, {}. Execute again " "after enough memory, details see be.INFO.", - type_string(type), label, print_bytes(mem_consumption), + type_string(type), label, MemCounter::print_bytes(mem_consumption), BackendOptions::get_localhost(), cancel_reason); }, profile, GCType::PROCESS); @@ -665,7 +668,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, "Process memory not enough, cancel top memory overcommit {}: " "<{}> consumption {}, backend {}, {}. Execute again " "after enough memory, details see be.INFO.", - type_string(type), label, print_bytes(mem_consumption), + type_string(type), label, MemCounter::print_bytes(mem_consumption), BackendOptions::get_localhost(), cancel_reason); }, profile, GCType::PROCESS); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index c8a8c845793087..faf354cca4cbf3 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -20,28 +20,29 @@ #include #include #include -#include #include +#include // IWYU pragma: no_include #include // IWYU pragma: keep #include #include #include -#include #include #include #include #include "common/config.h" #include "common/status.h" -#include "runtime/memory/mem_tracker.h" +#include "runtime/memory/mem_counter.h" +#include "runtime/query_statistics.h" #include "util/string_util.h" #include "util/uid_util.h" namespace doris { class RuntimeProfile; +class MemTrackerLimiter; constexpr size_t MEM_TRACKER_GROUP_NUM = 1000; @@ -58,78 +59,115 @@ struct TrackerLimiterGroup { std::mutex group_lock; }; -// Track and limit the memory usage of process and query. -// Contains an limit, arranged into a tree structure. -// -// Automatically track every once malloc/free of the system memory allocator (Currently, based on TCMlloc hook). -// Put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts,all memory used by this thread -// will be recorded on this Query, otherwise it will be recorded in Orphan Tracker by default. -class MemTrackerLimiter final : public MemTracker { +/* + * Track and limit the memory usage of process and query. + * + * Usually, put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts, + * all memory used by this thread will be recorded on this Query. + * + * This class is thread-safe. +*/ +class MemTrackerLimiter final { public: + /* + * Part 1, Type definition + */ + // TODO There are more and more GC codes and there should be a separate manager class. enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 }; - inline static std::unordered_map> TypeMemSum = { - {Type::GLOBAL, std::make_shared()}, - {Type::QUERY, std::make_shared()}, - {Type::LOAD, std::make_shared()}, - {Type::COMPACTION, std::make_shared()}, - {Type::SCHEMA_CHANGE, std::make_shared()}, - {Type::OTHER, std::make_shared()}}; + enum class Type { + GLOBAL = 0, // Life cycle is the same as the process, e.g. Cache and default Orphan + QUERY = 1, // Count the memory consumption of all Query tasks. + LOAD = 2, // Count the memory consumption of all Load tasks. + COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks. + SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks. + OTHER = 5, + }; + + struct Snapshot { + std::string type; + std::string label; + int64_t limit = 0; + int64_t cur_consumption = 0; + int64_t peak_consumption = 0; + + bool operator<(const Snapshot& rhs) const { return cur_consumption < rhs.cur_consumption; } + }; + + // Corresponding to MemTrackerLimiter::Type. + // MemCounter contains atomic variables, which are not allowed to be copied or moved. + inline static std::unordered_map TypeMemSum; + + /* + * Part 2, Constructors and property methods + */ -public: static std::shared_ptr create_shared( MemTrackerLimiter::Type type, const std::string& label = std::string(), int64_t byte_limit = -1); // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit); - ~MemTrackerLimiter() override; + ~MemTrackerLimiter(); - static std::string gc_type_string(GCType type) { - switch (type) { - case GCType::PROCESS: - return "process"; - case GCType::WORK_LOAD_GROUP: - return "work load group"; - default: - LOG(FATAL) << "not match gc type:" << static_cast(type); - } - LOG(FATAL) << "__builtin_unreachable"; - __builtin_unreachable(); - } - - void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption not supported"; } + Type type() const { return _type; } + const std::string& label() const { return _label; } + std::shared_ptr get_query_statistics() { return _query_statistics; } int64_t group_num() const { return _group_num; } bool has_limit() const { return _limit >= 0; } int64_t limit() const { return _limit; } bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); } + Status check_limit(int64_t bytes = 0); + bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; } + bool is_query_cancelled() { return _is_query_cancelled; } + void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } + + // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. + std::list>::iterator wg_tracker_limiter_group_it; + + /* + * Part 3, Memory tracking method (use carefully!) + * + * Note: Only memory not allocated by Doris Allocator can be tracked by manually calling consume() and release(). + * Memory allocated by Doris Allocator needs to be tracked using SCOPED_ATTACH_TASK or + * SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER. + */ + + int64_t consumption() const { return _mem_counter.current_value(); } + int64_t peak_consumption() const { return _mem_counter.peak_value(); } + + void consume(int64_t bytes) { + _mem_counter.add(bytes); + if (_query_statistics) { + _query_statistics->set_max_peak_memory_bytes(peak_consumption()); + _query_statistics->set_current_used_memory_bytes(consumption()); + } + } + + void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); } + + void release(int64_t bytes) { _mem_counter.sub(bytes); } - bool try_consume(int64_t bytes) const { + bool try_consume(int64_t bytes) { if (UNLIKELY(bytes == 0)) { return true; } - bool st = true; + bool rt = true; if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) { - st = _consumption->try_add(bytes, _limit); + rt = _mem_counter.try_add(bytes, _limit); } else { - _consumption->add(bytes); + _mem_counter.add(bytes); } - if (st && _query_statistics) { - _query_statistics->set_max_peak_memory_bytes(_consumption->peak_value()); - _query_statistics->set_current_used_memory_bytes(_consumption->current_value()); + if (rt && _query_statistics) { + _query_statistics->set_max_peak_memory_bytes(peak_consumption()); + _query_statistics->set_current_used_memory_bytes(consumption()); } - return st; + return rt; } - Status check_limit(int64_t bytes = 0); - bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; } - - bool is_query_cancelled() { return _is_query_cancelled; } + void set_consumption(int64_t bytes) { _mem_counter.set(bytes); } - void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } - -public: // Transfer 'bytes' of consumption from this tracker to 'dst'. void transfer_to(int64_t size, MemTrackerLimiter* dst) { if (label() == dst->label()) { @@ -139,21 +177,50 @@ class MemTrackerLimiter final : public MemTracker { dst->cache_consume(size); } + // If need to consume the tracker frequently, use it + void cache_consume(int64_t bytes); + + /* + * Part 4, Reserved memory tracking method + */ + + int64_t reserved_consumption() const { return _reserved_counter.current_value(); } + int64_t reserved_peak_consumption() const { return _reserved_counter.peak_value(); } + + bool try_reserve(int64_t bytes) { + bool rt = try_consume(bytes); + if (rt) { + _reserved_counter.add(bytes); + } + return rt; + } + + void release_reserved(int64_t bytes) { + _reserved_counter.sub(bytes); + DCHECK(reserved_consumption() >= 0); + } + + Snapshot make_reserved_trackers_snapshot() const; + static void make_all_reserved_trackers_snapshots(std::vector* snapshots); + + /* + * Part 4, Memory snapshot and log method + */ + static void refresh_global_counter(); static void clean_tracker_limiter_group(); - Snapshot make_snapshot() const override; + Snapshot make_snapshot() const; // Returns a list of all the valid tracker snapshots. - static void make_process_snapshots(std::vector* snapshots); - static void make_type_snapshots(std::vector* snapshots, Type type); - static void make_all_trackers_snapshots(std::vector* snapshots); - static void make_all_memory_state_snapshots(std::vector* snapshots); - static void make_top_consumption_snapshots(std::vector* snapshots, - int top_num); - - static std::string log_usage(MemTracker::Snapshot snapshot); + static void make_process_snapshots(std::vector* snapshots); + static void make_type_snapshots(std::vector* snapshots, Type type); + static void make_all_trackers_snapshots(std::vector* snapshots); + static void make_all_memory_state_snapshots(std::vector* snapshots); + static void make_top_consumption_snapshots(std::vector* snapshots, int top_num); + + static std::string log_usage(Snapshot snapshot); std::string log_usage() const { return log_usage(make_snapshot()); } - static std::string type_log_usage(MemTracker::Snapshot snapshot); + static std::string type_log_usage(Snapshot snapshot); static std::string type_detail_usage(const std::string& msg, Type type); void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } @@ -161,6 +228,12 @@ class MemTrackerLimiter final : public MemTracker { static void enable_print_log_process_usage() { _enable_print_log_process_usage = true; } static std::string log_process_usage_str(); static void print_log_process_usage(); + // Log the memory usage when memory limit is exceeded. + std::string tracker_limit_exceeded_str(); + + /* + * Part 5, Memory GC method + */ // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed. // cancel_reason recorded when gc is triggered, for log printing. @@ -191,6 +264,53 @@ class MemTrackerLimiter final : public MemTracker { return free_top_overcommit_query(min_free_mem, cancel_reason, profile, Type::LOAD); } + /* + * Part 6, Memory debug method + */ + + void add_address_sanitizers(void* buf, size_t size); + void remove_address_sanitizers(void* buf, size_t size); + bool is_group_commit_load {false}; + +private: + /* + * Part 7, Private method + */ + + static std::string type_string(Type type) { + switch (type) { + case Type::GLOBAL: + return "global"; + case Type::QUERY: + return "query"; + case Type::LOAD: + return "load"; + case Type::COMPACTION: + return "compaction"; + case Type::SCHEMA_CHANGE: + return "schema_change"; + case Type::OTHER: + return "other"; + default: + LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast(type); + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + + static std::string gc_type_string(GCType type) { + switch (type) { + case GCType::PROCESS: + return "process"; + case GCType::WORK_LOAD_GROUP: + return "work load group"; + default: + LOG(FATAL) << "not match gc type:" << static_cast(type); + } + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + // only for Type::QUERY or Type::LOAD. static TUniqueId label_to_queryid(const std::string& label) { if (label.find("#Id=") == std::string::npos) { @@ -202,37 +322,23 @@ class MemTrackerLimiter final : public MemTracker { return querytid; } - // Log the memory usage when memory limit is exceeded. - std::string tracker_limit_exceeded_str(); - - void add_address_sanitizers(void* buf, size_t size); - void remove_address_sanitizers(void* buf, size_t size); - bool is_group_commit_load {false}; - - std::string debug_string() override { - std::stringstream msg; - msg << "limit: " << _limit << "; " - << "consumption: " << _consumption->current_value() << "; " - << "label: " << _label << "; " - << "type: " << type_string(_type) << "; "; - return msg.str(); - } - - // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. - std::list>::iterator wg_tracker_limiter_group_it; - -private: - friend class ThreadMemTrackerMgr; - - // If need to consume the tracker frequently, use it - void cache_consume(int64_t bytes); - // When the accumulated untracked memory value exceeds the upper limit, // the current value is returned and set to 0. // Thread safety. int64_t add_untracked_mem(int64_t bytes); -private: + /* + * Part 8, Property definition + */ + + Type _type; + + // label used in the make snapshot, not guaranteed unique. + std::string _label; + + MemCounter _mem_counter; + MemCounter _reserved_counter; + // Limit on memory consumption, in bytes. int64_t _limit; @@ -250,6 +356,8 @@ class MemTrackerLimiter final : public MemTracker { bool _enable_print_log_usage = false; static std::atomic _enable_print_log_process_usage; + std::shared_ptr _query_statistics = nullptr; + struct AddressSanitizer { size_t size; std::string stack_trace; @@ -271,7 +379,9 @@ inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { } inline void MemTrackerLimiter::cache_consume(int64_t bytes) { - if (bytes == 0) return; + if (bytes == 0) { + return; + } int64_t consume_bytes = add_untracked_mem(bytes); consume(consume_bytes); } @@ -280,9 +390,10 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { if (bytes <= 0 || (is_overcommit_tracker() && config::enable_query_memory_overcommit)) { return Status::OK(); } - if (_limit > 0 && _consumption->current_value() + bytes > _limit) { - return Status::MemoryLimitExceeded(fmt::format( - "failed alloc size {}, {}", print_bytes(bytes), tracker_limit_exceeded_str())); + if (_limit > 0 && consumption() + bytes > _limit) { + return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}", + MemCounter::print_bytes(bytes), + tracker_limit_exceeded_str())); } return Status::OK(); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 33dd0d41822ae1..d036564528534c 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -46,7 +46,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( DCHECK(mem_tracker); CHECK(init()); flush_untracked_mem(); - _reserved_mem_stack.push_back(_reserved_mem); + _last_attach_snapshots_stack.push_back({_reserved_mem, _consumer_tracker_stack}); if (_reserved_mem != 0) { // _untracked_mem temporary store bytes that not synchronized to process reserved memory, // but bytes have been subtracted from thread _reserved_mem. @@ -54,6 +54,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( _reserved_mem = 0; _untracked_mem = 0; } + _consumer_tracker_stack.clear(); _limiter_tracker = mem_tracker; } @@ -62,9 +63,10 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( CHECK(init()); flush_untracked_mem(); release_reserved(); - DCHECK(!_reserved_mem_stack.empty()); - _reserved_mem = _reserved_mem_stack.back(); - _reserved_mem_stack.pop_back(); + DCHECK(!_last_attach_snapshots_stack.empty()); + _reserved_mem = _last_attach_snapshots_stack.back().reserved_mem; + _consumer_tracker_stack = _last_attach_snapshots_stack.back().consumer_tracker_stack; + _last_attach_snapshots_stack.pop_back(); _limiter_tracker = old_mem_tracker; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index bb0091f2e6d6fb..fd14750d8b8ebc 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -106,7 +106,7 @@ class ThreadMemTrackerMgr { std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; for (const auto& v : _consumer_tracker_stack) { - fmt::format_to(consumer_tracker_buf, "{}, ", MemTracker::log_usage(v->make_snapshot())); + fmt::format_to(consumer_tracker_buf, "{}, ", v->log_usage()); } return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, " @@ -119,6 +119,11 @@ class ThreadMemTrackerMgr { int64_t reserved_mem() const { return _reserved_mem; } private: + struct LastAttachSnapshot { + int64_t reserved_mem = 0; + std::vector consumer_tracker_stack; + }; + // is false: ExecEnv::ready() = false when thread local is initialized bool _init = false; // Cache untracked mem. @@ -126,9 +131,10 @@ class ThreadMemTrackerMgr { int64_t _old_untracked_mem = 0; int64_t _reserved_mem = 0; + // SCOPED_ATTACH_TASK cannot be nested, but SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be used, // so `attach_limiter_tracker` may be nested. - std::vector _reserved_mem_stack; + std::vector _last_attach_snapshots_stack; std::string _failed_consume_msg = std::string(); // If true, the Allocator will wait for the GC to free memory if it finds that the memory exceed limit. @@ -194,6 +200,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che // if _untracked_mem less than -SYNC_PROC_RESERVED_INTERVAL_BYTES, increase process reserved memory. if (std::abs(_untracked_mem) >= SYNC_PROC_RESERVED_INTERVAL_BYTES) { doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem); + _limiter_tracker->release_reserved(_untracked_mem); _untracked_mem = 0; } return; @@ -205,6 +212,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che size -= _reserved_mem; doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem + _untracked_mem); + _limiter_tracker->release_reserved(_reserved_mem + _untracked_mem); _reserved_mem = 0; _untracked_mem = 0; } @@ -277,7 +285,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { // if _reserved_mem not equal to 0, repeat reserve, // _untracked_mem store bytes that not synchronized to process reserved memory. flush_untracked_mem(); - if (!_limiter_tracker->try_consume(size)) { + if (!_limiter_tracker->try_reserve(size)) { auto err_msg = fmt::format( "reserve memory failed, size: {}, because memory tracker consumption: {}, limit: " "{}", @@ -289,14 +297,16 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) { auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, wg_ptr->memory_debug_string()); - _limiter_tracker->release(size); // rollback + _limiter_tracker->release(size); // rollback + _limiter_tracker->release_reserved(size); // rollback return doris::Status::MemoryLimitExceeded(err_msg); } } if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, GlobalMemoryArbitrator::process_mem_log_str()); - _limiter_tracker->release(size); // rollback + _limiter_tracker->release(size); // rollback + _limiter_tracker->release_reserved(size); // rollback if (wg_ptr) { wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback } @@ -310,6 +320,7 @@ inline void ThreadMemTrackerMgr::release_reserved() { if (_reserved_mem != 0) { doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem + _untracked_mem); + _limiter_tracker->release_reserved(_reserved_mem + _untracked_mem); _limiter_tracker->release(_reserved_mem); auto wg_ptr = _wg_wptr.lock(); if (wg_ptr) { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index b9430d3899b8d3..10f5ca19addefb 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -151,9 +151,9 @@ QueryContext::~QueryContext() { mem_tracker_msg = fmt::format( ", deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " "PeakUsed={}", - print_id(_query_id), MemTracker::print_bytes(query_mem_tracker->limit()), - MemTracker::print_bytes(query_mem_tracker->consumption()), - MemTracker::print_bytes(query_mem_tracker->peak_consumption())); + print_id(_query_id), MemCounter::print_bytes(query_mem_tracker->limit()), + MemCounter::print_bytes(query_mem_tracker->consumption()), + MemCounter::print_bytes(query_mem_tracker->peak_consumption())); } uint64_t group_id = 0; if (_workload_group) { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index b63495df837d1a..2c69b8a58704bf 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -315,7 +315,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { bool RoutineLoadTaskExecutor::_reach_memory_limit() { bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); auto current_load_mem_value = - MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD]->current_value(); + MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD].current_value(); if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) { LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit << " current_load_mem_value: " << current_load_mem_value diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index d2b55d86bc6bd4..01fcf851321fc1 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -48,8 +48,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParams _state = state; _state->runtime_filter_mgr = this; _query_mem_tracker = query_mem_tracker; - _tracker = std::make_unique("RuntimeFilterMgr(experimental)", - _query_mem_tracker.get()); + _tracker = std::make_unique("RuntimeFilterMgr(experimental)"); } RuntimeFilterMgr::~RuntimeFilterMgr() { @@ -264,8 +263,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params, const TQueryOptions& query_options) { _query_id = query_id; - _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity(experimental)", - ExecEnv::GetInstance()->details_mem_tracker_set()); + _mem_tracker = std::make_shared("RuntimeFilterMergeControllerEntity(experimental)"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (runtime_filter_params.__isset.rid_to_runtime_filter) { for (const auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 85f79536b74ec3..6f3b51f09fd1f2 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -210,21 +210,21 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, cancel_str = fmt::format( "MinorGC kill overcommit query, wg id:{}, name:{}, used:{}, limit:{}, " "backend:{}.", - _id, _name, MemTracker::print_bytes(used_memory), - MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + _id, _name, MemCounter::print_bytes(used_memory), + MemCounter::print_bytes(_memory_limit), BackendOptions::get_localhost()); } else { if (_enable_memory_overcommit) { cancel_str = fmt::format( "FullGC release wg overcommit mem, wg id:{}, name:{}, " "used:{},limit:{},backend:{}.", - _id, _name, MemTracker::print_bytes(used_memory), - MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + _id, _name, MemCounter::print_bytes(used_memory), + MemCounter::print_bytes(_memory_limit), BackendOptions::get_localhost()); } else { cancel_str = fmt::format( "GC wg for hard limit, wg id:{}, name:{}, used:{}, limit:{}, " "backend:{}.", - _id, _name, MemTracker::print_bytes(used_memory), - MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + _id, _name, MemCounter::print_bytes(used_memory), + MemCounter::print_bytes(_memory_limit), BackendOptions::get_localhost()); } } auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, @@ -232,14 +232,14 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, return fmt::format( "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, Execute " "again after enough memory, details see be.INFO.", - cancel_str, label, MemTracker::print_bytes(mem_consumption), + cancel_str, label, MemCounter::print_bytes(mem_consumption), GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); }; auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { return fmt::format( "{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute again " "after enough memory, details see be.INFO.", - cancel_str, label, MemTracker::print_bytes(mem_consumption), + cancel_str, label, MemCounter::print_bytes(mem_consumption), GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str()); }; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 32470fed5ab929..65a8e3685c80ed 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -232,9 +232,9 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { // check whether queries need to revoke memory for task group for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) { debug_msg += fmt::format( - "\n MemTracker Label={}, Parent Label={}, Used={}, SpillThreshold={}, " + "\n MemTracker Label={}, Used={}, SpillThreshold={}, " "Peak={}", - query_mem_tracker->label(), query_mem_tracker->parent_label(), + query_mem_tracker->label(), PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES), PrettyPrinter::print(query_spill_threshold, TUnit::BYTES), PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES)); diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 0a27c415a48c0a..9719a672b8dff4 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -191,9 +191,9 @@ LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capa } RowCache::RowCache(int64_t capacity, int num_shards) - : LRUCachePolicyTrackingManual( - CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, LRUCacheType::SIZE, - config::point_query_row_cache_stale_sweep_time_sec, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, + LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec, + num_shards) {} // Create global instance of this class RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { @@ -223,8 +223,8 @@ void RowCache::insert(const RowCacheKey& key, const Slice& value) { auto* row_cache_value = new RowCacheValue; row_cache_value->cache_value = cache_value; const std::string& encoded_key = key.encode(); - auto* handle = LRUCachePolicyTrackingManual::insert(encoded_key, row_cache_value, value.size, - value.size, CachePriority::NORMAL); + auto* handle = LRUCachePolicy::insert(encoded_key, row_cache_value, value.size, value.size, + CachePriority::NORMAL); // handle will released auto tmp = CacheHandle {this, handle}; } diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 6c6fb28f95a378..b22dc5bfd1d73f 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -126,9 +126,9 @@ class Reusable { }; // RowCache is a LRU cache for row store -class RowCache : public LRUCachePolicyTrackingManual { +class RowCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; // The cache key for row lru cache struct RowCacheKey { @@ -220,7 +220,7 @@ class RowCache : public LRUCachePolicyTrackingManual { // A cache used for prepare stmt. // One connection per stmt perf uuid -class LookupConnectionCache : public LRUCachePolicyTrackingManual { +class LookupConnectionCache : public LRUCachePolicy { public: static LookupConnectionCache* instance() { return ExecEnv::GetInstance()->get_lookup_connection_cache(); @@ -231,9 +231,9 @@ class LookupConnectionCache : public LRUCachePolicyTrackingManual { private: friend class PointQueryExecutor; LookupConnectionCache(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, - capacity, LRUCacheType::NUMBER, - config::tablet_lookup_cache_stale_sweep_time_sec) {} + : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity, + LRUCacheType::NUMBER, + config::tablet_lookup_cache_stale_sweep_time_sec) {} static std::string encode_key(__int128_t cache_id) { fmt::memory_buffer buffer; diff --git a/be/src/util/obj_lru_cache.cpp b/be/src/util/obj_lru_cache.cpp index 05b8b8824b5448..600ffdb647ce44 100644 --- a/be/src/util/obj_lru_cache.cpp +++ b/be/src/util/obj_lru_cache.cpp @@ -20,9 +20,9 @@ namespace doris { ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingManual( - CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, capacity, LRUCacheType::NUMBER, - config::common_obj_lru_cache_stale_sweep_time_sec, num_shards) { + : LRUCachePolicy(CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, capacity, + LRUCacheType::NUMBER, config::common_obj_lru_cache_stale_sweep_time_sec, + num_shards) { _enabled = (capacity > 0); } diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h index c7f805fc3a1de2..680a32e79bc991 100644 --- a/be/src/util/obj_lru_cache.h +++ b/be/src/util/obj_lru_cache.h @@ -25,9 +25,9 @@ namespace doris { // A common object cache depends on an Sharded LRU Cache. // It has a certain capacity, which determin how many objects it can cache. // Caller must hold a CacheHandle instance when visiting the cached object. -class ObjLRUCache : public LRUCachePolicyTrackingManual { +class ObjLRUCache : public LRUCachePolicy { public: - using LRUCachePolicyTrackingManual::insert; + using LRUCachePolicy::insert; struct ObjKey { ObjKey(const std::string& key_) : key(key_) {} @@ -94,8 +94,8 @@ class ObjLRUCache : public LRUCachePolicyTrackingManual { if (_enabled) { const std::string& encoded_key = key.key; auto* obj_value = new ObjValue(value); - auto* handle = LRUCachePolicyTrackingManual::insert(encoded_key, obj_value, 1, - sizeof(T), CachePriority::NORMAL); + auto* handle = LRUCachePolicy::insert(encoded_key, obj_value, 1, sizeof(T), + CachePriority::NORMAL); *cache_handle = CacheHandle {this, handle}; } else { cache_handle = nullptr; diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 2619c0bafffb16..19969abf6cca8c 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -229,7 +229,6 @@ void Allocator::throw_b throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } -#ifndef NDEBUG template void Allocator::add_address_sanitizers( void* buf, size_t size) const { @@ -251,7 +250,6 @@ void Allocator::remove_ #endif doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size); } -#endif template void* Allocator::alloc(size_t size, diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 0427d0c968df7e..b05128bc6933cc 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -242,10 +242,8 @@ class Allocator { void consume_memory(size_t size) const; void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; -#ifndef NDEBUG void add_address_sanitizers(void* buf, size_t size) const; void remove_address_sanitizers(void* buf, size_t size) const; -#endif void* alloc(size_t size, size_t alignment = 0); void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); @@ -289,9 +287,7 @@ class Allocator { if constexpr (MemoryAllocator::need_record_actual_size()) { record_size = MemoryAllocator::allocated_size(buf); } -#ifndef NDEBUG add_address_sanitizers(buf, record_size); -#endif } else { buf = nullptr; int res = MemoryAllocator::posix_memalign(&buf, alignment, size); @@ -307,9 +303,7 @@ class Allocator { if constexpr (MemoryAllocator::need_record_actual_size()) { record_size = MemoryAllocator::allocated_size(buf); } -#ifndef NDEBUG add_address_sanitizers(buf, record_size); -#endif } } if constexpr (MemoryAllocator::need_record_actual_size()) { @@ -325,9 +319,7 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); } } else { -#ifndef NDEBUG remove_address_sanitizers(buf, size); -#endif MemoryAllocator::free(buf); } release_memory(size); @@ -351,9 +343,7 @@ class Allocator { if (!use_mmap || (old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)) { -#ifndef NDEBUG remove_address_sanitizers(buf, old_size); -#endif /// Resize malloc'd memory region with no special alignment requirement. void* new_buf = MemoryAllocator::realloc(buf, new_size); if (nullptr == new_buf) { @@ -361,11 +351,8 @@ class Allocator { throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size)); } -#ifndef NDEBUG - add_address_sanitizers( - new_buf, - new_size); // usually, buf addr = new_buf addr, asan maybe not equal. -#endif + // usually, buf addr = new_buf addr, asan maybe not equal. + add_address_sanitizers(new_buf, new_size); buf = new_buf; if constexpr (clear_memory) @@ -395,10 +382,8 @@ class Allocator { // Big allocs that requires a copy. void* new_buf = alloc(new_size, alignment); memcpy(new_buf, buf, std::min(old_size, new_size)); -#ifndef NDEBUG add_address_sanitizers(new_buf, new_size); remove_address_sanitizers(buf, old_size); -#endif free(buf, old_size); buf = new_buf; } diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp index 9adb30b93054f4..1acc38f2b9e084 100644 --- a/be/test/olap/lru_cache_test.cpp +++ b/be/test/olap/lru_cache_test.cpp @@ -88,18 +88,18 @@ class CacheTest : public testing::Test { void* value; }; - class CacheTestSizePolicy : public LRUCachePolicyTrackingManual { + class CacheTestSizePolicy : public LRUCachePolicy { public: CacheTestSizePolicy(size_t capacity) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::FOR_UT_CACHE_SIZE, capacity, - LRUCacheType::SIZE, -1) {} + : LRUCachePolicy(CachePolicy::CacheType::FOR_UT_CACHE_SIZE, capacity, + LRUCacheType::SIZE, -1) {} }; - class CacheTestNumberPolicy : public LRUCachePolicyTrackingManual { + class CacheTestNumberPolicy : public LRUCachePolicy { public: CacheTestNumberPolicy(size_t capacity, uint32_t num_shards) - : LRUCachePolicyTrackingManual(CachePolicy::CacheType::FOR_UT_CACHE_NUMBER, - capacity, LRUCacheType::NUMBER, -1, num_shards) {} + : LRUCachePolicy(CachePolicy::CacheType::FOR_UT_CACHE_NUMBER, capacity, + LRUCacheType::NUMBER, -1, num_shards) {} }; // there is 16 shards in ShardedLRUCache diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp index d4624273b0b854..fad2116fca7630 100644 --- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp +++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp @@ -167,8 +167,8 @@ TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) { std::unique_ptr thread_context = std::make_unique(); std::shared_ptr t1 = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-MultiMemTracker1"); - std::shared_ptr t2 = std::make_shared("UT-MultiMemTracker2", t1.get()); - std::shared_ptr t3 = std::make_shared("UT-MultiMemTracker3", t1.get()); + std::shared_ptr t2 = std::make_shared("UT-MultiMemTracker2"); + std::shared_ptr t3 = std::make_shared("UT-MultiMemTracker3"); int64_t size1 = 4 * 1024; int64_t size2 = 4 * 1024 * 1024;