Skip to content

Commit

Permalink
[enhancement](baddisk) record bad disk in be_custom.conf to handle
Browse files Browse the repository at this point in the history
reboot
  • Loading branch information
dataroaring committed Sep 19, 2023
1 parent eea84ac commit c3c336a
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 53 deletions.
9 changes: 6 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ DEFINE_mInt32(tablet_lookup_cache_clean_interval, "30");
DEFINE_mInt32(disk_stat_monitor_interval, "5");
DEFINE_mInt32(unused_rowset_monitor_interval, "30");
DEFINE_String(storage_root_path, "${DORIS_HOME}/storage");
DEFINE_String(broken_storage_path, "");

// Config is used to check incompatible old format hdr_ format
// whether doris uses strict way. When config is true, process will log fatal
Expand Down Expand Up @@ -1326,9 +1327,9 @@ void Properties::set_force(const std::string& key, const std::string& val) {
}

Status Properties::dump(const std::string& conffile) {
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(conffile));
std::string conffile_tmp = conffile + ".tmp";
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(conffile, &file_writer));
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(conffile_tmp, &file_writer));
RETURN_IF_ERROR(file_writer->append("# THIS IS AN AUTO GENERATED CONFIG FILE.\n"));
RETURN_IF_ERROR(file_writer->append(
"# You can modify this file manually, and the configurations in this file\n"));
Expand All @@ -1341,7 +1342,9 @@ Status Properties::dump(const std::string& conffile) {
RETURN_IF_ERROR(file_writer->append("\n"));
}

return file_writer->close();
RETURN_IF_ERROR(file_writer->close());

return io::global_local_filesystem()->rename(conffile_tmp, conffile);
}

template <typename T>
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ DECLARE_mInt32(tablet_lookup_cache_clean_interval);
DECLARE_mInt32(disk_stat_monitor_interval);
DECLARE_mInt32(unused_rowset_monitor_interval);
DECLARE_String(storage_root_path);
DECLARE_mString(broken_storage_path);

// Config is used to check incompatible old format hdr_ format
// whether doris uses strict way. When config is true, process will log fatal
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ void DataDir::health_check() {
if (!res) {
LOG(WARNING) << "store read/write test file occur IO Error. path=" << _path
<< ", err: " << res;
StorageEngine::instance()->add_broken_path(_path);
_is_used = !res.is<IO_ERROR>();
}
}
Expand Down
25 changes: 24 additions & 1 deletion be/src/olap/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,19 @@ Status parse_conf_store_paths(const string& config_path, std::vector<StorePath>*
// deal with the case that user add `;` to the tail
path_vec.pop_back();
}

std::set<std::string> real_paths;
for (auto& item : path_vec) {
StorePath path;
auto res = parse_root_path(item, &path);
if (res.ok()) {
paths->emplace_back(std::move(path));
auto success = real_paths.emplace(path.path).second;
if (success) {
paths->emplace_back(std::move(path));
} else {
LOG(WARNING) << "a duplicated path is found " << path.path;
return Status::Error<INVALID_ARGUMENT>("a duplicated path is found, path={}", path.path);
}
} else {
LOG(WARNING) << "failed to parse store path " << item << ", res=" << res;
}
Expand All @@ -172,6 +180,21 @@ Status parse_conf_store_paths(const string& config_path, std::vector<StorePath>*
return Status::OK();
}

void parse_conf_broken_store_paths(const string& config_path, std::set<std::string>* paths) {
std::vector<string> path_vec = strings::Split(config_path, ";", strings::SkipWhitespace());
if (path_vec.empty()) {
return;
}
if (path_vec.back().empty()) {
// deal with the case that user add `;` to the tail
path_vec.pop_back();
}
for (auto& item : path_vec) {
paths->emplace(item);
}
return;
}

/** format:
* [
* {"path": "storage1", "total_size":53687091200,"query_limit": "10737418240"},
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ Status parse_root_path(const std::string& root_path, StorePath* path);

Status parse_conf_store_paths(const std::string& config_path, std::vector<StorePath>* path);

void parse_conf_broken_store_paths(const std::string& config_path, std::set<std::string>* paths);

struct CachePath {
io::FileCacheSettings init_settings() const;
CachePath(std::string path, int64_t total_bytes, int64_t query_limit_bytes)
Expand All @@ -62,6 +64,7 @@ Status parse_conf_cache_paths(const std::string& config_path, std::vector<CacheP
struct EngineOptions {
// list paths that tablet will be put into.
std::vector<StorePath> store_paths;
std::set<std::string> broken_paths;
// BE's UUID. It will be reset every time BE restarts.
UniqueId backend_uid {0, 0};
};
Expand Down
33 changes: 33 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
// std::lock_guard<std::mutex> lock(_gc_mutex);
return _unused_rowsets.size();
});

_broken_paths = options.broken_paths;
}

StorageEngine::~StorageEngine() {
Expand Down Expand Up @@ -1379,4 +1381,35 @@ void StorageEngine::evict_querying_rowset(RowsetId rs_id) {
_querying_rowsets.erase(rs_id);
}

bool StorageEngine::add_broken_path(std::string path) {
std::lock_guard<std::mutex> lock(_broken_paths_mutex);
auto success = _broken_paths.emplace(path).second;
if (success) {
_persist_broken_paths();
}
return success;
}

bool StorageEngine::remove_broken_path(std::string path) {
std::lock_guard<std::mutex> lock(_broken_paths_mutex);
auto count = _broken_paths.erase(path);
if (count > 1) {
_persist_broken_paths();
}
return count > 1;
}

Status StorageEngine::_persist_broken_paths() {
std::string config_value;
for (const std::string& path : _broken_paths) {
config_value += path + ";";
}

if (config_value.length() > 0) {
return config::set_config("broken_store_path", config_value);
}

return Status::OK();
}

} // namespace doris
8 changes: 8 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ class StorageEngine {

void evict_querying_rowset(RowsetId rs_id);

bool add_broken_path(std::string path);
bool remove_broken_path(std::string path);

private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
Expand Down Expand Up @@ -336,6 +339,8 @@ class StorageEngine {

void _async_publish_callback();

Status _persist_broken_paths();

private:
struct CompactionCandidate {
CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
Expand Down Expand Up @@ -370,6 +375,9 @@ class StorageEngine {
std::mutex _store_lock;
std::mutex _trash_sweep_lock;
std::map<std::string, DataDir*> _store_map;
std::set<std::string> _broken_paths;
std::mutex _broken_paths_mutex;

uint32_t _available_storage_medium_type_count;

int32_t _effective_cluster_id;
Expand Down
35 changes: 0 additions & 35 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,41 +590,6 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl
return Status::OK();
}

Status TabletManager::drop_tablets_on_error_root_path(
const std::vector<TabletInfo>& tablet_info_vec) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
Status res = Status::OK();
if (tablet_info_vec.empty()) { // This is a high probability event
return res;
}
std::vector<std::set<size_t>> local_tmp_vector(_tablets_shards_size);
for (size_t idx = 0; idx < tablet_info_vec.size(); ++idx) {
local_tmp_vector[tablet_info_vec[idx].tablet_id & _tablets_shards_mask].insert(idx);
}
for (int32 i = 0; i < _tablets_shards_size; ++i) {
if (local_tmp_vector[i].empty()) {
continue;
}
std::lock_guard<std::shared_mutex> wrlock(_tablets_shards[i].lock);
for (size_t idx : local_tmp_vector[i]) {
const TabletInfo& tablet_info = tablet_info_vec[idx];
TTabletId tablet_id = tablet_info.tablet_id;
VLOG_NOTICE << "drop_tablet begin. tablet_id=" << tablet_id;
TabletSharedPtr dropped_tablet = _get_tablet_unlocked(tablet_id);
if (dropped_tablet == nullptr) {
LOG(WARNING) << "dropping tablet not exist, "
<< " tablet=" << tablet_id;
continue;
} else {
_remove_tablet_from_partition(dropped_tablet);
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map.erase(tablet_id);
}
}
}
return res;
}

TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, bool include_deleted, string* err) {
std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id));
return _get_tablet_unlocked(tablet_id, include_deleted, err);
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ class TabletManager {
// If `is_drop_table_or_partition` is true, we need to remove all remote rowsets in this tablet.
Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool is_drop_table_or_partition);

Status drop_tablets_on_error_root_path(const std::vector<TabletInfo>& tablet_info_vec);

TabletSharedPtr find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score,
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ class ExecEnv {
~ExecEnv();

// Initial exec environment. must call this to init all
[[nodiscard]] static Status init(ExecEnv* env, const std::vector<StorePath>& store_paths);
[[nodiscard]] static Status init(ExecEnv* env, const std::vector<StorePath>& store_paths,
const std::set<std::string>& broken_paths);

// Stop all threads and delete resources.
void destroy();
Expand Down Expand Up @@ -268,7 +269,8 @@ class ExecEnv {
private:
ExecEnv();

[[nodiscard]] Status _init(const std::vector<StorePath>& store_paths);
[[nodiscard]] Status _init(const std::vector<StorePath>& store_paths,
const std::set<std::string>& broken_paths);
void _destroy();

Status _init_mem_env();
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, network_interfaces);
}

Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) {
return env->_init(store_paths);
Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
const std::set<std::string>& broken_paths) {
return env->_init(store_paths, broken_paths);
}

Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
Status ExecEnv::_init(const std::vector<StorePath>& store_paths, const std::set<std::string>& broken_paths) {
//Only init once before be destroyed
if (ready()) {
return Status::OK();
Expand Down Expand Up @@ -205,7 +206,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_block_spill_mgr = new BlockSpillManager(_store_paths);
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
_file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
Expand Down Expand Up @@ -247,6 +248,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
// Storage engine
doris::EngineOptions options;
options.store_paths = store_paths;
options.broken_paths = broken_paths;
options.backend_uid = doris::UniqueId::gen_uid();
_storage_engine = new StorageEngine(options);
auto st = _storage_engine->open();
Expand Down
14 changes: 12 additions & 2 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,19 @@ int main(int argc, char** argv) {
LOG(FATAL) << "parse config storage path failed, path=" << doris::config::storage_root_path;
exit(-1);
}
std::set<std::string> broken_paths;
doris::parse_conf_broken_store_paths(doris::config::broken_storage_path, &broken_paths);

auto it = paths.begin();
for (; it != paths.end();) {
if (!doris::check_datapath_rw(it->path)) {
if (broken_paths.count(it->path) > 0) {
if (doris::config::ignore_broken_disk) {
LOG(WARNING) << "ignore broken disk, path = " << it->path;
} else {
LOG(FATAL) << "a broken disk is found " << it->path;
exit(-1);
}
} else if (!doris::check_datapath_rw(it->path)) {
if (doris::config::ignore_broken_disk) {
LOG(WARNING) << "read write test file failed, path=" << it->path;
it = paths.erase(it);
Expand Down Expand Up @@ -472,7 +482,7 @@ int main(int argc, char** argv) {

// init exec env
auto exec_env(doris::ExecEnv::GetInstance());
status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths);
status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths, broken_paths);
if (status != Status::OK()) {
LOG(FATAL) << "failed to init doris storage engine, res=" << status;
exit(-1);
Expand Down
70 changes: 70 additions & 0 deletions be/test/olap/options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,74 @@ TEST_F(OptionsTest, parse_root_path) {
}
}

TEST_F(OptionsTest, parse_conf_store_path) {
std::string path_prefix = std::filesystem::absolute("./test_run").string();
std::string path1 = path_prefix + "/palo";
std::string path2 = path_prefix + "/palo.ssd";

{
std::vector<StorePath> paths;
std::string config_path = path1;
auto st = parse_conf_store_paths(config_path, &paths);
EXPECT_EQ(Status::OK(), st);
EXPECT_EQ(paths.size(), 1);
EXPECT_STREQ(paths[0].path.c_str(), config_path.c_str());
EXPECT_EQ(paths[0].capacity_bytes, -1);
EXPECT_EQ(paths[0].storage_medium, TStorageMedium::HDD);
}
{
std::vector<StorePath> paths;
std::string config_path = path1 + ";";
auto st = parse_conf_store_paths(config_path, &paths);
EXPECT_EQ(Status::OK(), st);
EXPECT_EQ(paths.size(), 1);
EXPECT_STREQ(paths[0].path.c_str(), path1.c_str());
EXPECT_EQ(paths[0].capacity_bytes, -1);
EXPECT_EQ(paths[0].storage_medium, TStorageMedium::HDD);
}
{
std::vector<StorePath> paths;
std::string config_path = path1 + ";" + path1;
auto st = parse_conf_store_paths(config_path, &paths);
EXPECT_EQ(Status::Error<ErrorCode::INVALID_ARGUMENT>("a duplicated path is found, path={}", path1), st);
}
{
std::vector<StorePath> paths;
std::string config_path = path1 + ";" + path2 + ";";
auto st = parse_conf_store_paths(config_path, &paths);
EXPECT_EQ(Status::OK(), st);
EXPECT_EQ(paths.size(), 2);
EXPECT_STREQ(paths[0].path.c_str(), path1.c_str());
EXPECT_EQ(paths[0].capacity_bytes, -1);
EXPECT_EQ(paths[0].storage_medium, TStorageMedium::HDD);
EXPECT_STREQ(paths[1].path.c_str(), path2.c_str());
EXPECT_EQ(paths[1].capacity_bytes, -1);
EXPECT_EQ(paths[1].storage_medium, TStorageMedium::SSD);
}
}

TEST_F(OptionsTest, parse_broken_path) {
{
std::string broken_paths = "path1";
std::set<std::string> parsed_paths;
parse_conf_broken_store_paths(broken_paths, &parsed_paths);
EXPECT_EQ(parsed_paths.size(), 1);
}
{
std::string broken_paths = "path1;path1;";
std::set<std::string> parsed_paths;
parse_conf_broken_store_paths(broken_paths, &parsed_paths);
EXPECT_EQ(parsed_paths.size(), 1);
EXPECT_EQ(parsed_paths.count("path1"), 1);
}
{
std::string broken_paths = "path1;path2;";
std::set<std::string> parsed_paths;
parse_conf_broken_store_paths(broken_paths, &parsed_paths);
EXPECT_EQ(parsed_paths.size(), 2);
EXPECT_EQ(parsed_paths.count("path1"), 1);
EXPECT_EQ(parsed_paths.count("path2"), 1);
}
}

} // namespace doris
Loading

0 comments on commit c3c336a

Please sign in to comment.