Skip to content

Commit

Permalink
use cluser_id instead of cloud_instance_id
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Sep 12, 2024
1 parent c71514d commit 1c7bbef
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 75 deletions.
20 changes: 6 additions & 14 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,20 +260,12 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
<< st;
}

if (master_info.__isset.cloud_instance_id) {
if (!config::cloud_instance_id.empty() &&
config::cloud_instance_id != master_info.cloud_instance_id) {
return Status::InvalidArgument(
"cloud_instance_id in fe.conf and be.conf are not same, fe: {}, be: {}",
master_info.cloud_instance_id, config::cloud_instance_id);
}

if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true);
config::set_cloud_unique_id(master_info.cloud_instance_id);
LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " "
<< st;
}
if (config::cluster_id == -1 && master_info.cluster_id != -1) {
auto st = config::set_config("cluster_id", std::to_string(master_info.cluster_id),
true);
config::set_cloud_unique_id(std::to_string(master_info.cluster_id));
LOG(INFO) << "set config cluster_id " << master_info.cluster_id << " "
<< st;
}

return Status::OK();
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
namespace doris::config {

DEFINE_String(deploy_mode, "");
DEFINE_mString(cloud_instance_id, "");
DEFINE_mString(cloud_unique_id, "");
DEFINE_mString(meta_service_endpoint, "");
DEFINE_Bool(meta_service_use_load_balancer, "false");
Expand Down Expand Up @@ -69,7 +68,7 @@ DEFINE_mBool(enable_new_tablet_do_compaction, "false");
DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");

void set_cloud_unique_id(std::string instance_id) {
if (cloud_unique_id.empty() && !instance_id.empty()) {
if (cloud_unique_id.empty() && !instance_id.empty() && instance_id != "-1") {
static_cast<void>(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true));
}
}
Expand Down
1 change: 0 additions & 1 deletion be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ namespace doris::config {

DECLARE_String(deploy_mode);
// deprecated do not configure directly
DECLARE_mString(cloud_instance_id);
DECLARE_mString(cloud_unique_id);

static inline bool is_cloud_mode() {
Expand Down
3 changes: 2 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1667,7 +1667,8 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default);
}

set_cloud_unique_id(cloud_instance_id);
std::string cluster_id_str = std::to_string(cluster_id);
set_cloud_unique_id(cluster_id_str);

return true;
}
Expand Down
22 changes: 11 additions & 11 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def get_add_init_config(self):

if self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
"cluster_id = " + self.cluster_id(),
]
else:
cfg += [
Expand All @@ -439,8 +439,8 @@ def docker_env(self):
def cloud_unique_id(self):
return "sql_server_{}".format(self.id)

def cloud_instance_id(self):
return "reg_cloud_instance"
def cluster_id(self):
return 123456;

def entrypoint(self):
return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")]
Expand Down Expand Up @@ -484,9 +484,9 @@ def get_add_init_config(self):
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
]
if self.cluster.be_cloud_instanceid:
if self.cluster.be_cluster_id:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
"cluster_id = " + self.cluster_id(),
]
if not self.cluster.sql_mode_node_mgr:
cfg += [
Expand Down Expand Up @@ -553,8 +553,8 @@ def docker_env(self):
def cloud_unique_id(self):
return "compute_node_{}".format(self.id)

def cloud_instance_id(self):
return "reg_cloud_instance"
def cluster_id(self):
return 12345678;

def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "be")
Expand Down Expand Up @@ -666,7 +666,7 @@ class Cluster(object):
def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
ms_config, recycle_config, fe_follower, be_disks, be_cluster,
reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid):
be_metaservice_endpoint, be_cluster_id):
self.name = name
self.subnet = subnet
self.image = image
Expand All @@ -687,13 +687,13 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
}
self.sql_mode_node_mgr = sql_mode_node_mgr
self.be_metaservice_endpoint = be_metaservice_endpoint
self.be_cloud_instanceid = be_cloud_instanceid
self.be_cluster_id = be_cluster_id

@staticmethod
def new(name, image, is_cloud, fe_config, be_config, ms_config,
recycle_config, fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid):
be_metaservice_endpoint, be_cluster_id):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
Expand All @@ -707,7 +707,7 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config,
fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint,
be_cloud_instanceid)
be_cluster_id)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
Expand Down
12 changes: 6 additions & 6 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,19 @@ def add_parser(self, args_parsers):

if self._support_boolean_action():
parser.add_argument(
"--be-cloud-instanceid",
"--be-cluster-id",
default=True,
action=self._get_parser_bool_action(False),
help=
"Do not set BE cloud instance ID in conf. Default is False.")
"Do not set BE cluster ID in conf. Default is False.")
else:
parser.add_argument(
"--no-be-cloud-instanceid",
dest='be_cloud_instanceid',
"--no-be-cluster-id",
dest='be_cluster_id',
default=True,
action=self._get_parser_bool_action(False),
help=
"Do not set BE cloud instance ID in conf. Default is False.")
"Do not set BE cluser ID in conf. Default is False.")

parser.add_argument(
"--fdb-version",
Expand Down Expand Up @@ -434,7 +434,7 @@ def run(self, args):
args.be_config, args.ms_config, args.recycle_config,
args.fe_follower, args.be_disks, args.be_cluster, args.reg_be,
args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr,
args.be_metaservice_endpoint, args.be_cloud_instanceid)
args.be_metaservice_endpoint, args.be_cluster_id)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2803,16 +2803,12 @@ public class Config extends ConfigBase {
public static String deploy_mode = "";

// compatibily with elder version.
// cloud_unique_id is introduced before cloud_instance_id, so it has higher priority.
// cloud_unique_id has higher priority than cluster_id.
@ConfField
public static String cloud_unique_id = "";

// If cloud_unique_id is empty, cloud_instance_id works, otherwise cloud_unique_id works.
@ConfField
public static String cloud_instance_id = "";

public static boolean isCloudMode() {
return deploy_mode.equals("cloud") || !cloud_unique_id.isEmpty() || !cloud_instance_id.isEmpty();
return deploy_mode.equals("cloud") || !cloud_unique_id.isEmpty();
}

public static boolean isNotCloudMode() {
Expand Down
19 changes: 15 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,17 @@ protected boolean isStartFromEmpty() {
return !roleFile.exists() && !versionFile.exists();
}

private void getClusterIdFromStorage(Storage storage) throws IOException {
clusterId = storage.getClusterID();
if (Config.cluster_id != -1 && Config.cluster_id != this.clusterId) {
LOG.warn("Configured cluster_id {} does not match stored cluster_id {}. "
+ "This may indicate a configuration error.",
Config.cluster_id, this.clusterId);
throw new IOException("Configured cluster_id does not match stored cluster_id. "
+ "Please check your configuration.");
}
}

protected void getClusterIdAndRole() throws IOException {
File roleFile = new File(this.imageDir, Storage.ROLE_FILE);
File versionFile = new File(this.imageDir, Storage.VERSION_FILE);
Expand Down Expand Up @@ -1231,7 +1242,7 @@ protected void getClusterIdAndRole() throws IOException {
frontends.put(nodeName, self);
LOG.info("add self frontend: {}", self);
} else {
clusterId = storage.getClusterID();
getClusterIdFromStorage(storage);
if (storage.getToken() == null) {
token = Strings.isNullOrEmpty(Config.auth_token) ? Storage.newToken() : Config.auth_token;
LOG.info("refresh new token");
Expand Down Expand Up @@ -1286,15 +1297,15 @@ protected void getClusterIdAndRole() throws IOException {
// NOTE: cluster_id will be init when Storage object is constructed,
// so we new one.
storage = new Storage(this.imageDir);
clusterId = storage.getClusterID();
getClusterIdFromStorage(storage);
token = storage.getToken();
if (Strings.isNullOrEmpty(token)) {
token = Config.auth_token;
}
} else {
// If the version file exist, read the cluster id and check the
// id with helper node to make sure they are identical
clusterId = storage.getClusterID();
getClusterIdFromStorage(storage);
token = storage.getToken();
try {
String url = "http://" + NetUtils
Expand Down Expand Up @@ -2060,7 +2071,7 @@ public boolean hasReplayer() {

public void loadImage(String imageDir) throws IOException, DdlException {
Storage storage = new Storage(imageDir);
clusterId = storage.getClusterID();
getClusterIdFromStorage(storage);
File curFile = storage.getCurrentImageFile();
if (!curFile.exists()) {
// image.0 may not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,20 @@ public CloudUpgradeMgr getCloudUpgradeMgr() {
return this.upgradeMgr;
}

public String getCloudInstanceId() {
return String.valueOf(Config.cluster_id);
}

@Override
public void initialize(String[] args) throws Exception {
if (Strings.isNullOrEmpty(Config.cloud_unique_id) && Config.cluster_id == -1) {
throw new UserException("cluser_id must be specified in fe.conf if deployed "
+ "in dissaggregated, because fe should known to which it belongs");
}

if (Strings.isNullOrEmpty(Config.cloud_unique_id)) {
if (Strings.isNullOrEmpty(Config.cloud_instance_id)) {
throw new UserException("cloud_instance_id must be specified if deployed in dissaggregated");
}
LOG.info("cloud_unique_id is not set, setting it using instance_id");
Config.cloud_unique_id = "1:" + Config.cloud_instance_id + ":sql_server00";
Config.cloud_unique_id = "1:" + getCloudInstanceId() + ":sqlserver";
LOG.info("cloud_unique_id is empty, setting it to: {}", Config.cloud_unique_id);
}

LOG.info("Initializing CloudEnv with cloud_unique_id: {}", Config.cloud_unique_id);
Expand Down Expand Up @@ -187,14 +193,11 @@ private Cloud.NodeInfoPB getLocalTypeFromMetaService() {
return local.orElse(null);
}

private void tryAddMyselToMS() {
private void tryAddMyselfToMS() {
try {
try {
if (Strings.isNullOrEmpty(Config.cloud_instance_id)) {
throw new DdlException("unable to create instance due to empty cloud_instance_id");
}
getCloudSystemInfoService().tryCreateInstance(Config.cloud_instance_id,
Config.cloud_instance_id, false);
getCloudSystemInfoService().tryCreateInstance(getCloudInstanceId(),
getCloudInstanceId(), false);
} catch (Exception e) {
return;
}
Expand All @@ -219,7 +222,7 @@ protected void getClusterIdAndRole() throws IOException {
LOG.warn("failed to get local fe's type, sleep {} s, try again.",
Config.resource_not_ready_sleep_seconds);
if (isStartFromEmpty()) {
tryAddMyselToMS();
tryAddMyselfToMS();
}
try {
Thread.sleep(Config.resource_not_ready_sleep_seconds * 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public synchronized void updateFrontends(List<Frontend> toAdd, List<Frontend> to

private void alterBackendCluster(List<HostInfo> hostInfos, String clusterId,
Cloud.AlterClusterRequest.Operation operation) throws DdlException {
if (Strings.isNullOrEmpty(Config.cloud_instance_id)) {
if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) {
throw new DdlException("unable to alter backends due to empty cloud_instance_id");
}
// Issue rpc to meta to alter node, then fe master would add this node to its frontends
Expand All @@ -332,7 +332,7 @@ private void alterBackendCluster(List<HostInfo> hostInfos, String clusterId,
}

Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder()
.setInstanceId(Config.cloud_instance_id)
.setInstanceId(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())
.setOp(operation)
.setCluster(clusterPB)
.build();
Expand Down Expand Up @@ -364,7 +364,7 @@ public void addBackends(List<HostInfo> hostInfos, Map<String, String> tagMap) th
throw new UserException("clusterName empty");
}

String clusterId = tryCreateCluster(clusterName, RandomIdentifierGenerator.generateRandomIdentifier(8));
String clusterId = tryCreateComputeGroup(clusterName, RandomIdentifierGenerator.generateRandomIdentifier(8));
alterBackendCluster(hostInfos, clusterId, Cloud.AlterClusterRequest.Operation.ADD_NODE);
}

Expand Down Expand Up @@ -765,7 +765,7 @@ public Map<String, String> getCloudClusterNameToId() {
// FrontendCluster = SqlServerCluster
private void alterFrontendCluster(FrontendNodeType role, String host, int editLogPort,
Cloud.AlterClusterRequest.Operation op) throws DdlException {
if (Strings.isNullOrEmpty(Config.cloud_instance_id)) {
if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) {
throw new DdlException("unable to alter frontend due to empty cloud_instance_id");
}

Expand All @@ -787,7 +787,7 @@ private void alterFrontendCluster(FrontendNodeType role, String host, int editLo
.build();

Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder()
.setInstanceId(Config.cloud_instance_id)
.setInstanceId(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())
.setOp(op)
.setCluster(clusterPB)
.build();
Expand Down Expand Up @@ -817,8 +817,8 @@ public void dropFrontend(FrontendNodeType role, String host, int editLogPort) th
alterFrontendCluster(role, host, editLogPort, Cloud.AlterClusterRequest.Operation.DROP_NODE);
}

private String tryCreateCluster(String clusterName, String clusterId) throws UserException {
if (Strings.isNullOrEmpty(Config.cloud_instance_id)) {
private String tryCreateComputeGroup(String clusterName, String clusterId) throws UserException {
if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) {
throw new DdlException("unable to create cluster due to empty cloud_instance_id");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ public void setMaster(int clusterId, String token, long epoch) {
long flags = heartbeatFlags.getHeartbeatFlags();
tMasterInfo.setHeartbeatFlags(flags);
if (Config.isCloudMode()) {
// Set cloud_instance_id and meta_service_endpoint even if there are empty
// Be can knowns that fe is working in cloud mode.
// Set the cloud instance ID for cloud deployment identification
tMasterInfo.setCloudInstanceId(Config.cloud_instance_id);
// Set the endpoint for the metadata service in cloud mode
tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint);
}
Expand Down
1 change: 0 additions & 1 deletion gensrc/thrift/HeartbeatService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ struct TMasterInfo {
8: optional i64 backend_id
9: optional list<TFrontendInfo> frontend_infos
10: optional string meta_service_endpoint;
11: optional string cloud_instance_id;
}

struct TBackendInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ClusterOptions {

Boolean sqlModeNodeMgr = false
Boolean beMetaServiceEndpoint = true
Boolean beCloudInstanceId = false
Boolean beClusterId = false

int waitTimeout = 180

Expand Down Expand Up @@ -322,8 +322,8 @@ class SuiteCluster {
if (!options.beMetaServiceEndpoint) {
cmd += ['--no-be-metaservice-endpoint']
}
if (!options.beCloudInstanceId) {
cmd += ['--no-be-cloud-instanceid']
if (!options.beClusterId) {
cmd += ['--no-be-cluster-id']
}

cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)]
Expand Down
Loading

0 comments on commit 1c7bbef

Please sign in to comment.