Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Sep 10, 2024
1 parent 4dc0817 commit 514a733
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 249 deletions.
68 changes: 41 additions & 27 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class NodeMeta(object):
def __init__(self, image):
self.image = image


class Group(object):

def __init__(self, node_type):
Expand Down Expand Up @@ -290,16 +291,27 @@ def docker_env(self):
enable_coverage = self.cluster.coverage_dir

envs = {
"MY_IP": self.get_ip(),
"MY_ID": self.id,
"MY_TYPE": self.node_type(),
"FE_QUERY_PORT": FE_QUERY_PORT,
"FE_EDITLOG_PORT": FE_EDITLOG_PORT,
"BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT,
"DORIS_HOME": os.path.join(self.docker_home_dir()),
"STOP_GRACE": 1 if enable_coverage else 0,
"IS_CLOUD": 1 if self.cluster.is_cloud else 0,
"SQL_MODE_NODE_MGR": 1 if hasattr(self.cluster, 'sql_mode_node_mgr') and self.cluster.sql_mode_node_mgr else 0
"MY_IP":
self.get_ip(),
"MY_ID":
self.id,
"MY_TYPE":
self.node_type(),
"FE_QUERY_PORT":
FE_QUERY_PORT,
"FE_EDITLOG_PORT":
FE_EDITLOG_PORT,
"BE_HEARTBEAT_PORT":
BE_HEARTBEAT_PORT,
"DORIS_HOME":
os.path.join(self.docker_home_dir()),
"STOP_GRACE":
1 if enable_coverage else 0,
"IS_CLOUD":
1 if self.cluster.is_cloud else 0,
"SQL_MODE_NODE_MGR":
1 if hasattr(self.cluster, 'sql_mode_node_mgr')
and self.cluster.sql_mode_node_mgr else 0
}

if self.cluster.is_cloud:
Expand Down Expand Up @@ -391,8 +403,7 @@ def get_add_init_config(self):
if self.cluster.is_cloud:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
"",
self.cluster.get_meta_server_addr()), "",
"# For regression-test",
"ignore_unsupported_properties_in_cloud_mode = true",
"merge_on_write_forced_to_false = true",
Expand Down Expand Up @@ -468,19 +479,19 @@ def get_add_init_config(self):
"deploy_mode = disaggregated",
]

if self.cluster.is_cloud and not self.cluster.no_be_metaservice_endpoint:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
]
if self.cluster.is_cloud and not self.cluster.no_be_cloud_instanceid:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
]
if self.cluster.is_cloud and not self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
if not self.cluster.no_be_metaservice_endpoint:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
]
if not self.cluster.no_be_cloud_instanceid:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
]
if not self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
return cfg

def init_cluster_name(self):
Expand Down Expand Up @@ -678,6 +689,8 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
self.no_be_metaservice_endpoint = no_be_metaservice_endpoint
self.no_be_cloud_instanceid = no_be_cloud_instanceid

LOG.info("xxxxx cluster")

@staticmethod
def new(name, image, is_cloud, fe_config, be_config, ms_config,
recycle_config, fe_follower, be_disks, be_cluster, reg_be,
Expand All @@ -694,8 +707,9 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config,
cluster = Cluster(name, subnet, image, is_cloud, fe_config,
be_config, ms_config, recycle_config,
fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid)
coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint,
be_cloud_instanceid)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
Expand Down
76 changes: 47 additions & 29 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,39 @@ def add_parser(self, args_parsers):
action=self._get_parser_bool_action(True),
help="Manager fe be via sql instead of http")

parser.add_argument("--no-be-metaservice-endpoint",
default=False,
action=self._get_parser_bool_action(True),
help="Do not set BE meta service endpoint in conf. Default is False.")
if self._support_boolean_action():
parser.add_argument(
"--be-metaservice-endpoint",
default=True,
action=self._get_parser_bool_action(True),
help=
"Do not set BE meta service endpoint in conf. Default is False."
)
else:
parser.add_argument(
"--no-be-metaservice-endpoint",
dest='be-metaservice-endpoint',
default=True,
action=self._get_parser_bool_action(True),
help=
"Do not set BE meta service endpoint in conf. Default is False."
)

parser.add_argument("--no-be-cloud-instanceid",
default=True,
action=self._get_parser_bool_action(True),
help="Do not set BE cloud instance ID in conf. Default is False.")
if self._support_boolean_action():
parser.add_argument(
"--be-cloud-instanceid",
default=True,
action=self._get_parser_bool_action(True),
help=
"Do not set BE cloud instance ID in conf. Default is False.")
else:
parser.add_argument(
"--no-be-cloud-instanceid",
dest='be-cloud-instanceid',
default=True,
action=self._get_parser_bool_action(True),
help=
"Do not set BE cloud instance ID in conf. Default is False.")

parser.add_argument(
"--fdb-version",
Expand Down Expand Up @@ -499,7 +523,8 @@ def do_add_node(node_type, add_num, add_ids):
"Not up cluster cause specific --no-start, related node num {}"
.format(related_node_num)))
else:
LOG.info("Using SQL mode for node management ? {}".format(args.sql_mode_node_mgr));
LOG.info("Using SQL mode for node management ? {}".format(
args.sql_mode_node_mgr))

# Wait for FE master to be elected
LOG.info("Waiting for FE master to be elected...")
Expand All @@ -509,13 +534,14 @@ def do_add_node(node_type, add_num, add_ids):
for id in add_fe_ids:
fe_state = db_mgr.get_fe(id)
if fe_state is not None and fe_state.alive:
break;
break
LOG.info("there is no fe ready")
time.sleep(5)

if cluster.is_cloud and args.sql_mode_node_mgr:
db_mgr = database.get_db_mgr(args.NAME, False)
master_fe_endpoint = CLUSTER.get_master_fe_endpoint(cluster.name)
master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
cluster.name)
# Add FEs except master_fe
for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE):
fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}"
Expand All @@ -524,7 +550,8 @@ def do_add_node(node_type, add_num, add_ids):
db_mgr.add_fe(fe_endpoint)
LOG.info(f"Added FE {fe_endpoint} successfully.")
except Exception as e:
LOG.error(f"Failed to add FE {fe_endpoint}: {str(e)}")
LOG.error(
f"Failed to add FE {fe_endpoint}: {str(e)}")

# Add BEs
for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE):
Expand Down Expand Up @@ -566,7 +593,6 @@ def do_add_node(node_type, add_num, add_ids):
raise Exception(err)
time.sleep(1)


LOG.info(
utils.render_green(
"Up cluster {} succ, related node num {}".format(
Expand Down Expand Up @@ -789,11 +815,7 @@ def info(self, detail):
else:
pass
result += [
query_port,
http_port,
node_path,
edit_log_port,
heartbeat_port
query_port, http_port, node_path, edit_log_port, heartbeat_port
]
return result

Expand Down Expand Up @@ -917,6 +939,7 @@ def run(self, args):
CLUSTER.Node.TYPE_FE, 1).cloud_unique_id()))
print("\nWrite succ: " + regression_conf_custom)


class ListCommand(Command):

def add_parser(self, args_parsers):
Expand Down Expand Up @@ -972,8 +995,7 @@ def parse_cluster_compose_file(cluster_name):
if services is None:
return COMPOSE_BAD, {}
return COMPOSE_GOOD, {
service:
ComposeService(
service: ComposeService(
service,
list(service_conf["networks"].values())[0]
["ipv4_address"], service_conf["image"])
Expand Down Expand Up @@ -1126,22 +1148,19 @@ def get_node_seq(node):

return self._handle_data(header, rows)


class GetCloudIniCommand(Command):

def add_parser(self, args_parsers):
parser = args_parsers.add_parser(
"get-cloud-ini", help="Get cloud.init")
parser = args_parsers.add_parser("get-cloud-ini",
help="Get cloud.init")
parser.add_argument(
"NAME",
nargs="*",
help=
"Specify multiple clusters, if specific, show all their containers."
)
self._add_parser_output_json(parser)
parser.add_argument("--detail",
default=False,
action=self._get_parser_bool_action(True),
help="Print more detail fields.")

def _handle_data(self, header, datas):
if utils.is_enable_log():
Expand All @@ -1157,9 +1176,7 @@ def _handle_data(self, header, datas):

def run(self, args):

header = [
"key", "value"
]
header = ["key", "value"]

rows = []

Expand All @@ -1172,6 +1189,7 @@ def run(self, args):

return self._handle_data(header, rows)


ALL_COMMANDS = [
UpCommand("up"),
DownCommand("down"),
Expand Down
12 changes: 7 additions & 5 deletions docker/runtime/doris-compose/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def load_states(self, query_ports):
self._load_fe_states(query_ports)
self._load_be_states()

def add_fe(self, fe_endpoint):
def add_fe(self, fe_endpoint):
try:
sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'"
self._exec_query(sql)
Expand Down Expand Up @@ -162,8 +162,8 @@ def decommission_be(self, be_endpoint):
time.sleep(5)

def create_default_storage_vault(self, cloud_store_config):
try:
# Create storage vault
try:
# Create storage vault
create_vault_sql = f"""
CREATE STORAGE VAULT IF NOT EXISTS default_vault
PROPERTIES (
Expand Down Expand Up @@ -195,7 +195,7 @@ def _load_fe_states(self, query_ports):
for record in self._exec_query('''
show frontends '''):
# Unpack the record into individual columns
name, ip , edit_log_port, _, query_port, _, _, role, is_master, cluster_id, _, alive, _, _, last_heartbeat, _, err_msg, _, _ = record
name, ip, edit_log_port, _, query_port, _, _, role, is_master, cluster_id, _, alive, _, _, last_heartbeat, _, err_msg, _, _ = record
is_master = utils.is_true(is_master)
alive = utils.is_true(alive)
id = CLUSTER.Node.get_id_from_ip(ip)
Expand All @@ -206,7 +206,9 @@ def _load_fe_states(self, query_ports):
fe_states[id] = fe
if is_master and alive and query_port:
alive_master_fe_port = query_port
LOG.info("record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}".format(name, ip, alive, is_master, role))
LOG.info(
"record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}"
.format(name, ip, alive, is_master, role))

self.fe_states = fe_states
if alive_master_fe_port and alive_master_fe_port != self.query_port:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ private static void checkDdlStmtSupported(DdlStmt ddlStmt) throws DdlException {
|| ddlStmt instanceof AdminRebalanceDiskStmt
|| ddlStmt instanceof AdminCancelRebalanceDiskStmt
|| ddlStmt instanceof AlterResourceStmt
|| ddlStmt instanceof AlterPolicyStmt) {
|| ddlStmt instanceof AlterPolicyStmt
|| ddlStmt instanceof CancelAlterSystemStmt) {
LOG.info("stmt={}, not supported in cloud mode", ddlStmt.toString());
throw new DdlException("Unsupported operation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class ClusterOptions {
int beNum = 3

Boolean sqlModeNodeMgr = false
Boolean noBeMetaServiceEndpoint = false
Boolean noBeCloudInstanceId = false
Boolean beMetaServiceEndpoint = false
Boolean beCloudInstanceId = false

int waitTimeout = 180

Expand Down Expand Up @@ -315,16 +315,15 @@ class SuiteCluster {
cmd += ['--fe-follower']
}

cmd += ['--wait-timeout', String.valueOf(180)]
if (options.sqlModeNodeMgr) {
cmd += ['--sql-mode-node-mgr']
}
if (options.noBeMetaServiceEndpoint) {
cmd += ['--no-be-metaservice-endpoint']
if (options.beMetaServiceEndpoint) {
cmd += ['--be-metaservice-endpoint']
}
if (options.noBeCloudInstanceId) {
if (options.beCloudInstanceId) {

cmd += ['--no-be-cloud-instanceid']
cmd += ['--be-cloud-instanceid']
}

cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)]
Expand Down
Loading

0 comments on commit 514a733

Please sign in to comment.