Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Sep 8, 2024
1 parent 74bb6df commit 7cd8aaa
Show file tree
Hide file tree
Showing 21 changed files with 544 additions and 81 deletions.
28 changes: 22 additions & 6 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,32 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
_engine.notify_listeners();
}

if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty()) {
if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
return Status::InvalidArgument("fe and be do not work in same mode, fe dissagregated: {},"
" be dissagregated: {}", master_info.__isset.meta_service_endpoint, config::is_cloud_mode());
}

if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() &&
!master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << st;
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " " << st;
}

if (master_info.__isset.cloud_instance_id && config::cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true);
LOG(INFO) << "set config cloud_instance_id " << st;
config::set_cloud_unique_id();
if (master_info.__isset.cloud_instance_id) {
if (!config::cloud_instance_id.empty() &&
config::cloud_instance_id != master_info.cloud_instance_id) {
return Status::InvalidArgument("cloud_instance_id in fe.conf and be.conf are not same, "
"fe: {}, be: {}", master_info.cloud_instance_id,
config::cloud_instance_id);
}

if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id,
true);
config::set_cloud_unique_id(master_info.cloud_instance_id);
LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " " << st;
}
}

return Status::OK();
Expand Down
19 changes: 1 addition & 18 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,7 @@ struct RefreshFSVaultVisitor {
};

Status CloudStorageEngine::open() {
cloud::StorageVaultInfos vault_infos;
do {
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
if (st.ok()) {
break;
}

LOG(WARNING) << "failed to get vault info, retry after 5s, err=" << st;
std::this_thread::sleep_for(5s);
} while (vault_infos.empty());

for (auto& [id, vault_info, path_format] : vault_infos) {
if (auto st = std::visit(VaultCreateFSVisitor {id, path_format}, vault_info); !st.ok())
[[unlikely]] {
return vault_process_error(id, vault_info, std::move(st));
}
}
set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));
sync_storage_vault();

// TODO(plat1ko): DeleteBitmapTxnManager

Expand Down
21 changes: 17 additions & 4 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,27 @@ class CloudStorageEngine final : public BaseStorageEngine {
}
void _check_file_cache_ttl_block_valid();

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) const {
std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
if (vault_id.empty()) {
if (latest_fs() == nullptr) {
LOG(INFO) << "there is not latest fs";
return std::nullopt;
}
return StorageResource {latest_fs()};
}

if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
bool synced = false;
do {
if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
if (synced) {
break;
}
sync_storage_vault();
synced = true;
} while (true);

return std::nullopt;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

namespace doris::config {

DEFINE_String(deploy_mode, "");
DEFINE_mString(cloud_instance_id, "");
DEFINE_mString(cloud_unique_id, "");
DEFINE_mString(meta_service_endpoint, "");
Expand Down
13 changes: 6 additions & 7 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@

namespace doris::config {

DECLARE_String(deploy_mode);
// deprecated do not configure directly
DECLARE_mString(cloud_instance_id);

DECLARE_mString(cloud_unique_id);

static inline bool is_cloud_mode() {
return !cloud_unique_id.empty() || !cloud_instance_id.empty();
return deploy_mode == "disaggregated" || !cloud_unique_id.empty();
}

static inline void set_cloud_unique_id() {
if (cloud_unique_id.empty()) {
if (!cloud_instance_id.empty()) {
cloud_unique_id = "1:" + cloud_instance_id + ":compute";
}
static inline void set_cloud_unique_id(std::string instance_id) {
if (cloud_unique_id.empty() && !instance_id.empty()) {
static_cast<void>(set_config("cloud_unique_id", "1:" + instance_id + ":compute"));
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default);
}

set_cloud_unique_id();
set_cloud_unique_id(cloud_instance_id);

return true;
}
Expand Down
28 changes: 19 additions & 9 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,15 @@ def get_add_init_config(self):
"# For regression-test",
"ignore_unsupported_properties_in_cloud_mode = true",
"merge_on_write_forced_to_false = true",
"deploy_mode = disaggregated"
]

if self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
]
else:
cfg += [
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
return cfg
Expand Down Expand Up @@ -460,20 +461,24 @@ def get_add_init_config(self):
cfg += self.cluster.be_config
if self.cluster.is_cloud:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
'tmp_file_dirs = [ {"path":"./storage/tmp","max_cache_bytes":10240000, "max_upload_bytes":10240000}]',
'enable_file_cache = true',
'file_cache_path = [ {{"path": "{}/storage/file_cache", "total_size":53687091200, "query_limit": 10737418240}}]'
.format(self.docker_home_dir()),
"deploy_mode = disaggregated",
]

if self.cluster.sql_mode_node_mgr:
if self.cluster.is_cloud and not self.cluster.no_be_metaservice_endpoint:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
]
if self.cluster.is_cloud and not self.cluster.no_be_cloud_instanceid:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
]
else:
cfg += [
if self.cluster.is_cloud and not self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
return cfg
Expand Down Expand Up @@ -649,7 +654,8 @@ class Cluster(object):

def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
ms_config, recycle_config, fe_follower, be_disks, be_cluster,
reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr):
reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr,
no_be_metaservice_endpoint, no_be_cloud_instanceid):
self.name = name
self.subnet = subnet
self.image = image
Expand All @@ -669,11 +675,14 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
for node_type in Node.TYPE_ALL
}
self.sql_mode_node_mgr = sql_mode_node_mgr
self.no_be_metaservice_endpoint = no_be_metaservice_endpoint
self.no_be_cloud_instanceid = no_be_cloud_instanceid

@staticmethod
def new(name, image, is_cloud, fe_config, be_config, ms_config,
recycle_config, fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config, sql_mode_node_mgr):
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
Expand All @@ -685,7 +694,8 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config,
cluster = Cluster(name, subnet, image, is_cloud, fe_config,
be_config, ms_config, recycle_config,
fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config, sql_mode_node_mgr)
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
Expand Down
Loading

0 comments on commit 7cd8aaa

Please sign in to comment.