Skip to content

Commit

Permalink
Revert "[improvement](create tablet) backend create tablet round robi…
Browse files Browse the repository at this point in the history
…n among disks (apache#23218)"

This reverts commit df5b5ae.
  • Loading branch information
dataroaring committed Dec 31, 2023
1 parent ac636a2 commit ef85c8a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 103 deletions.
10 changes: 1 addition & 9 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,15 +864,7 @@ void DataDir::update_remote_data_size(int64_t size) {
disks_remote_used_capacity->set_value(size);
}

size_t DataDir::disk_capacity() const {
return _disk_capacity_bytes;
}

size_t DataDir::disk_available() const {
return _available_bytes;
}

size_t DataDir::tablet_num() const {
size_t DataDir::tablet_size() const {
std::lock_guard<std::mutex> l(_mutex);
return _tablet_set.size();
}
Expand Down
6 changes: 1 addition & 5 deletions be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,7 @@ class DataDir {

void update_remote_data_size(int64_t size);

size_t disk_capacity() const;

size_t disk_available() const;

size_t tablet_num() const;
size_t tablet_size() const;

void disks_compaction_score_increment(int64_t delta);

Expand Down
104 changes: 17 additions & 87 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,103 +450,33 @@ Status StorageEngine::set_cluster_id(int32_t cluster_id) {

std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet(
TStorageMedium::type storage_medium) {
struct DirInfo {
DataDir* data_dir;

size_t disk_available;
//if disk_available is high, then available_level is small
int available_level;

int tablet_num;

bool operator<(const DirInfo& other) const {
if (available_level != other.available_level) {
return available_level < other.available_level;
}
if (tablet_num != other.tablet_num) {
return tablet_num < other.tablet_num;
}
return data_dir->path_hash() < other.data_dir->path_hash();
}
};
std::map<size_t, int> available_levels;
std::vector<DirInfo> dir_infos;
int next_index = 0;
size_t max_disk_capacity = 0;
std::vector<DataDir*> stores;
{
std::lock_guard<std::mutex> l(_store_lock);
next_index = _store_next_index[storage_medium]++;
if (next_index < 0) {
next_index = 0;
_store_next_index[storage_medium] = next_index + 1;
}
for (auto& it : _store_map) {
DataDir* data_dir = it.second;
if (data_dir->is_used()) {
if (it.second->is_used()) {
if (_available_storage_medium_type_count == 1 ||
data_dir->storage_medium() == storage_medium) {
size_t disk_available = data_dir->disk_available();
DirInfo dir_info;
dir_info.data_dir = data_dir;
dir_info.available_level = disk_available;
dir_infos.push_back(dir_info);
available_levels[disk_available] = 0;
size_t disk_capacity = data_dir->disk_capacity();
if (max_disk_capacity < disk_capacity) {
max_disk_capacity = disk_capacity;
}
it.second->storage_medium() == storage_medium) {
stores.push_back(it.second);
}
}
}
}

std::vector<DataDir*> stores;
if (dir_infos.empty()) {
return stores;
}

// if two disk available diff not exceeds 20% capacity, then they are the same available level.
size_t same_level_available_diff = std::max<size_t>(max_disk_capacity / 5, 1);
int level = 0;
size_t level_start_available = available_levels.rbegin()->first;
for (auto rit = available_levels.rbegin(); rit != available_levels.rend(); rit++) {
if (level_start_available - rit->first >= same_level_available_diff) {
level_start_available = rit->first;
level++;
}
rit->second = level;
}

for (auto& dir_info : dir_infos) {
dir_info.tablet_num = dir_info.data_dir->tablet_num();
dir_info.available_level = available_levels[dir_info.disk_available];
}

std::sort(dir_infos.begin(), dir_infos.end());

// Suppose there are five data dirs (D1, D2, D3, D4, D5).
// D1/D2/D3 contain 1 tablet, D4/D5 contain 2 tablets.
// If three creating tablets threads simultaneously invoke this function to get stores,
// then the return stores will be as below:
// thread 1: (D1, D2, D3, D4, D5)
// thread 2: (D2, D3, D1, D5, D4)
// thread 3: (D3, D1, D2, D4, D5)
stores.reserve(dir_infos.size());
for (size_t i = 0; i < dir_infos.size();) {
size_t end = i + 1;
while (end < dir_infos.size() && dir_infos[i].tablet_num == dir_infos[end].tablet_num &&
dir_infos[i].available_level == dir_infos[end].available_level) {
end++;
}
// data dirs [i, end) have the same tablet size, round robin range [i, end)
size_t count = end - i;
for (size_t k = 0; k < count; k++) {
size_t index = i + (k + next_index) % count;
stores.push_back(dir_infos[index].data_dir);
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(stores.begin(), stores.end(), g);
// Two random choices
for (int i = 0; i < stores.size(); i++) {
int j = i + 1;
if (j < stores.size()) {
if (stores[i]->tablet_size() > stores[j]->tablet_size()) {
std::swap(stores[i], stores[j]);
}
std::shuffle(stores.begin() + j, stores.end(), g);
} else {
break;
}
i = end;
}

return stores;
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,6 @@ class StorageEngine {
bool _clear_segment_cache = false;

std::atomic<bool> _need_clean_trash {false};
// next index for create tablet
std::map<TStorageMedium::type, int> _store_next_index;

DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};
Expand Down

0 comments on commit ef85c8a

Please sign in to comment.