diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 1203127941394b..1f454e368e1336 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -245,16 +245,32 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { _engine.notify_listeners(); } - if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty()) { + if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) { + return Status::InvalidArgument("fe and be do not work in same mode, fe dissagregated: {}," + " be dissagregated: {}", master_info.__isset.meta_service_endpoint, config::is_cloud_mode()); + } + + if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() && + !master_info.meta_service_endpoint.empty()) { auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint, true); - LOG(INFO) << "set config meta_service_endpoing " << st; + LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " " << st; } - if (master_info.__isset.cloud_instance_id && config::cloud_instance_id.empty()) { - auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true); - LOG(INFO) << "set config cloud_instance_id " << st; - config::set_cloud_unique_id(); + if (master_info.__isset.cloud_instance_id) { + if (!config::cloud_instance_id.empty() && + config::cloud_instance_id != master_info.cloud_instance_id) { + return Status::InvalidArgument("cloud_instance_id in fe.conf and be.conf are not same, " + "fe: {}, be: {}", master_info.cloud_instance_id, + config::cloud_instance_id); + } + + if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) { + auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, + true); + config::set_cloud_unique_id(master_info.cloud_instance_id); + LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " " << st; + } } return Status::OK(); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 3e56c23d1d3e79..4fdaf977d155ee 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -160,24 +160,7 @@ struct RefreshFSVaultVisitor { }; Status CloudStorageEngine::open() { - cloud::StorageVaultInfos vault_infos; - do { - auto st = _meta_mgr->get_storage_vault_info(&vault_infos); - if (st.ok()) { - break; - } - - LOG(WARNING) << "failed to get vault info, retry after 5s, err=" << st; - std::this_thread::sleep_for(5s); - } while (vault_infos.empty()); - - for (auto& [id, vault_info, path_format] : vault_infos) { - if (auto st = std::visit(VaultCreateFSVisitor {id, path_format}, vault_info); !st.ok()) - [[unlikely]] { - return vault_process_error(id, vault_info, std::move(st)); - } - } - set_latest_fs(get_filesystem(std::get<0>(vault_infos.back()))); + sync_storage_vault(); // TODO(plat1ko): DeleteBitmapTxnManager diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 337b3194fd22b0..d3a55c3c377276 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -74,14 +74,27 @@ class CloudStorageEngine final : public BaseStorageEngine { } void _check_file_cache_ttl_block_valid(); - std::optional get_storage_resource(const std::string& vault_id) const { + std::optional get_storage_resource(const std::string& vault_id) { + LOG(INFO) << "Getting storage resource for vault_id: " << vault_id; if (vault_id.empty()) { + if (latest_fs() == nullptr) { + LOG(INFO) << "there is not latest fs"; + return std::nullopt; + } return StorageResource {latest_fs()}; } - if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) { - return storage_resource->first; - } + bool synced = false; + do { + if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) { + return storage_resource->first; + } + if (synced) { + break; + } + sync_storage_vault(); + synced = true; + } while (true); return std::nullopt; } diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index bd74d3a4f3d603..03b58772df71d7 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -19,6 +19,7 @@ namespace doris::config { +DEFINE_String(deploy_mode, ""); DEFINE_mString(cloud_instance_id, ""); DEFINE_mString(cloud_unique_id, ""); DEFINE_mString(meta_service_endpoint, ""); diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 1855fc1bffab27..f77bef1b4301eb 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -21,19 +21,18 @@ namespace doris::config { +DECLARE_String(deploy_mode); +// deprecated do not configure directly DECLARE_mString(cloud_instance_id); - DECLARE_mString(cloud_unique_id); static inline bool is_cloud_mode() { - return !cloud_unique_id.empty() || !cloud_instance_id.empty(); + return deploy_mode == "disaggregated" || !cloud_unique_id.empty(); } -static inline void set_cloud_unique_id() { - if (cloud_unique_id.empty()) { - if (!cloud_instance_id.empty()) { - cloud_unique_id = "1:" + cloud_instance_id + ":compute"; - } +static inline void set_cloud_unique_id(std::string instance_id) { + if (cloud_unique_id.empty() && !instance_id.empty()) { + static_cast(set_config("cloud_unique_id", "1:" + instance_id + ":compute")); } } diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9689b7de4e5939..b8deb383916e74 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1659,7 +1659,7 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t SET_FIELD(it.second, std::vector, fill_conf_map, set_to_default); } - set_cloud_unique_id(); + set_cloud_unique_id(cloud_instance_id); return true; } diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index a8e1c81e3f15bf..20716b697f73fb 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -396,6 +396,7 @@ def get_add_init_config(self): "# For regression-test", "ignore_unsupported_properties_in_cloud_mode = true", "merge_on_write_forced_to_false = true", + "deploy_mode = disaggregated" ] if self.cluster.sql_mode_node_mgr: @@ -403,7 +404,7 @@ def get_add_init_config(self): "cloud_instance_id = " + self.cloud_instance_id(), ] else: - cfg += [ + cfg += [ "cloud_unique_id = " + self.cloud_unique_id(), ] return cfg @@ -460,20 +461,24 @@ def get_add_init_config(self): cfg += self.cluster.be_config if self.cluster.is_cloud: cfg += [ - "meta_service_endpoint = {}".format( - self.cluster.get_meta_server_addr()), 'tmp_file_dirs = [ {"path":"./storage/tmp","max_cache_bytes":10240000, "max_upload_bytes":10240000}]', 'enable_file_cache = true', 'file_cache_path = [ {{"path": "{}/storage/file_cache", "total_size":53687091200, "query_limit": 10737418240}}]' .format(self.docker_home_dir()), + "deploy_mode = disaggregated", ] - if self.cluster.sql_mode_node_mgr: + if self.cluster.is_cloud and not self.cluster.no_be_metaservice_endpoint: + cfg += [ + "meta_service_endpoint = {}".format( + self.cluster.get_meta_server_addr()), + ] + if self.cluster.is_cloud and not self.cluster.no_be_cloud_instanceid: cfg += [ "cloud_instance_id = " + self.cloud_instance_id(), ] - else: - cfg += [ + if self.cluster.is_cloud and not self.cluster.sql_mode_node_mgr: + cfg += [ "cloud_unique_id = " + self.cloud_unique_id(), ] return cfg @@ -649,7 +654,8 @@ class Cluster(object): def __init__(self, name, subnet, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, - reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr): + reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr, + no_be_metaservice_endpoint, no_be_cloud_instanceid): self.name = name self.subnet = subnet self.image = image @@ -669,11 +675,14 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config, for node_type in Node.TYPE_ALL } self.sql_mode_node_mgr = sql_mode_node_mgr + self.no_be_metaservice_endpoint = no_be_metaservice_endpoint + self.no_be_cloud_instanceid = no_be_cloud_instanceid @staticmethod def new(name, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, reg_be, - coverage_dir, cloud_store_config, sql_mode_node_mgr): + coverage_dir, cloud_store_config, sql_mode_node_mgr, + be_metaservice_endpoint, be_cloud_instanceid): if not os.path.exists(LOCAL_DORIS_PATH): os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) os.chmod(LOCAL_DORIS_PATH, 0o777) @@ -685,7 +694,8 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config, cluster = Cluster(name, subnet, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, reg_be, - coverage_dir, cloud_store_config, sql_mode_node_mgr) + coverage_dir, cloud_store_config, sql_mode_node_mgr, + be_metaservice_endpoint, be_cloud_instanceid) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index 89ba3071a496b8..4f7eb97e47cd26 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -302,6 +302,16 @@ def add_parser(self, args_parsers): action=self._get_parser_bool_action(True), help="Manager fe be via sql instead of http") + parser.add_argument("--no-be-metaservice-endpoint", + default=False, + action=self._get_parser_bool_action(True), + help="Do not set BE meta service endpoint in conf. Default is False.") + + parser.add_argument("--no-be-cloud-instanceid", + default=True, + action=self._get_parser_bool_action(True), + help="Do not set BE cloud instance ID in conf. Default is False.") + parser.add_argument( "--fdb-version", type=str, @@ -399,7 +409,8 @@ def run(self, args): args.NAME, args.IMAGE, args.cloud, args.fe_config, args.be_config, args.ms_config, args.recycle_config, args.fe_follower, args.be_disks, args.be_cluster, args.reg_be, - args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr) + args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr, + args.no_be_metaservice_endpoint, args.no_be_cloud_instanceid) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) @@ -488,6 +499,46 @@ def do_add_node(node_type, add_num, add_ids): "Not up cluster cause specific --no-start, related node num {}" .format(related_node_num))) else: + LOG.info("Using SQL mode for node management ? {}".format(args.sql_mode_node_mgr)); + + # Wait for FE master to be elected + LOG.info("Waiting for FE master to be elected...") + expire_ts = time.time() + 30 + while expire_ts > time.time(): + db_mgr = database.get_db_mgr(args.NAME, False) + for id in add_fe_ids: + fe_state = db_mgr.get_fe(id) + if fe_state is not None and fe_state.alive: + break; + LOG.info("there is no fe ready") + time.sleep(5) + + if cluster.is_cloud and args.sql_mode_node_mgr: + db_mgr = database.get_db_mgr(args.NAME, False) + master_fe_endpoint = CLUSTER.get_master_fe_endpoint(cluster.name) + # Add FEs except master_fe + for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE): + fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}" + if fe_endpoint != master_fe_endpoint: + try: + db_mgr.add_fe(fe_endpoint) + LOG.info(f"Added FE {fe_endpoint} successfully.") + except Exception as e: + LOG.error(f"Failed to add FE {fe_endpoint}: {str(e)}") + + # Add BEs + for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE): + be_endpoint = f"{be.get_ip()}:{CLUSTER.BE_HEARTBEAT_PORT}" + try: + db_mgr.add_be(be_endpoint) + LOG.info(f"Added BE {be_endpoint} successfully.") + except Exception as e: + LOG.error(f"Failed to add BE {be_endpoint}: {str(e)}") + + cloud_store_config = self._get_cloud_store_config() + + db_mgr.create_default_storage_vault(cloud_store_config) + if args.wait_timeout != 0: if args.wait_timeout == -1: args.wait_timeout = 1000000000 @@ -514,6 +565,8 @@ def do_add_node(node_type, add_num, add_ids): err += "dead be: " + str(dead_backends) + ". " raise Exception(err) time.sleep(1) + + LOG.info( utils.render_green( "Up cluster {} succ, related node num {}".format( @@ -620,6 +673,8 @@ def run(self, args): args.fdb_id, ignore_not_exists=True) + LOG.info("down cluster " + args.NAME + " for all " + str(for_all)) + if for_all: if os.path.exists(cluster.get_compose_file()): try: @@ -705,6 +760,8 @@ def __init__(self): self.tablet_num = "" self.last_heartbeat = "" self.err_msg = "" + self.edit_log_port = 0 + self.heartbeat_port = 0 def info(self, detail): result = [ @@ -716,13 +773,17 @@ def info(self, detail): if detail: query_port = "" http_port = "" + heartbeat_port = "" + edit_log_port = "" node_path = CLUSTER.get_node_path(self.cluster_name, self.node_type, self.id) if self.node_type == CLUSTER.Node.TYPE_FE: query_port = CLUSTER.FE_QUERY_PORT http_port = CLUSTER.FE_HTTP_PORT + edit_log_port = CLUSTER.FE_EDITLOG_PORT elif self.node_type == CLUSTER.Node.TYPE_BE: http_port = CLUSTER.BE_WEBSVR_PORT + heartbeat_port = CLUSTER.BE_HEARTBEAT_PORT elif self.node_type == CLUSTER.Node.TYPE_MS or self.node_type == CLUSTER.Node.TYPE_RECYCLE: http_port = CLUSTER.MS_PORT else: @@ -731,6 +792,8 @@ def info(self, detail): query_port, http_port, node_path, + edit_log_port, + heartbeat_port ] return result @@ -743,6 +806,7 @@ def update_db_info(self, db_mgr): self.query_port = fe.query_port self.last_heartbeat = fe.last_heartbeat self.err_msg = fe.err_msg + self.edit_log_port = fe.edit_log_port elif self.node_type == CLUSTER.Node.TYPE_BE: self.backend_id = -1 be = db_mgr.get_be(self.id) @@ -752,6 +816,7 @@ def update_db_info(self, db_mgr): self.tablet_num = be.tablet_num self.last_heartbeat = be.last_heartbeat self.err_msg = be.err_msg + self.heartbeat_port = be.heartbeat_port class GenConfCommand(Command): @@ -852,7 +917,6 @@ def run(self, args): CLUSTER.Node.TYPE_FE, 1).cloud_unique_id())) print("\nWrite succ: " + regression_conf_custom) - class ListCommand(Command): def add_parser(self, args_parsers): @@ -985,6 +1049,8 @@ def parse_cluster_compose_file(cluster_name): "query_port", "http_port", "path", + "edit_log_port", + "heartbeat_port", ] rows = [] @@ -1060,6 +1126,51 @@ def get_node_seq(node): return self._handle_data(header, rows) +class GetCloudIniCommand(Command): + + def add_parser(self, args_parsers): + parser = args_parsers.add_parser( + "get-cloud-ini", help="Get cloud.init") + parser.add_argument( + "NAME", + nargs="*", + help= + "Specify multiple clusters, if specific, show all their containers." + ) + self._add_parser_output_json(parser) + parser.add_argument("--detail", + default=False, + action=self._get_parser_bool_action(True), + help="Print more detail fields.") + + def _handle_data(self, header, datas): + if utils.is_enable_log(): + table = prettytable.PrettyTable( + [utils.render_green(field) for field in header]) + for row in datas: + table.add_row(row) + print(table) + return "" + else: + datas.insert(0, header) + return datas + + def run(self, args): + + header = [ + "key", "value" + ] + + rows = [] + + with open(CLUSTER.CLOUD_CFG_FILE, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + key, value = line.split('=', 1) + rows.append([key.strip(), value.strip()]) + + return self._handle_data(header, rows) ALL_COMMANDS = [ UpCommand("up"), @@ -1069,6 +1180,7 @@ def get_node_seq(node): SimpleCommand("restart", "Restart the doris containers. "), SimpleCommand("pause", "Pause the doris containers. "), SimpleCommand("unpause", "Unpause the doris containers. "), + GetCloudIniCommand("get-cloud-ini"), GenConfCommand("config"), ListCommand("ls"), ] diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py index 10388a1326b380..38fefed0fb0fa1 100644 --- a/docker/runtime/doris-compose/database.py +++ b/docker/runtime/doris-compose/database.py @@ -20,6 +20,7 @@ import pymysql import time import utils +import uuid LOG = utils.get_logger() @@ -27,19 +28,20 @@ class FEState(object): def __init__(self, id, query_port, is_master, alive, last_heartbeat, - err_msg): + err_msg, edit_log_port): self.id = id self.query_port = query_port self.is_master = is_master self.alive = alive self.last_heartbeat = last_heartbeat self.err_msg = err_msg + self.edit_log_port = edit_log_port class BEState(object): def __init__(self, id, backend_id, decommissioned, alive, tablet_num, - last_heartbeat, err_msg): + last_heartbeat, err_msg, heartbeat_port): self.id = id self.backend_id = backend_id self.decommissioned = decommissioned @@ -47,6 +49,7 @@ def __init__(self, id, backend_id, decommissioned, alive, tablet_num, self.tablet_num = tablet_num self.last_heartbeat = last_heartbeat self.err_msg = err_msg + self.heartbeat_port = heartbeat_port class DBManager(object): @@ -70,6 +73,15 @@ def load_states(self, query_ports): self._load_fe_states(query_ports) self._load_be_states() + def add_fe(self, fe_endpoint): + try: + sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'" + self._exec_query(sql) + LOG.info(f"Added FE {fe_endpoint} via SQL successfully.") + except Exception as e: + LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}") + raise + def drop_fe(self, fe_endpoint): id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")]) try: @@ -85,6 +97,15 @@ def drop_fe(self, fe_endpoint): return raise e + def add_be(self, be_endpoint): + try: + sql = f"ALTER SYSTEM ADD BACKEND '{be_endpoint}'" + self._exec_query(sql) + LOG.info(f"Added BE {be_endpoint} via SQL successfully.") + except Exception as e: + LOG.error(f"Failed to add BE {be_endpoint} via SQL: {str(e)}") + raise + def drop_be(self, be_endpoint): id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")]) try: @@ -140,24 +161,53 @@ def decommission_be(self, be_endpoint): time.sleep(5) + def create_default_storage_vault(self, cloud_store_config): + try: + # Create storage vault + create_vault_sql = f""" + CREATE STORAGE VAULT IF NOT EXISTS default_vault + PROPERTIES ( + "type" = "S3", + "s3.access_key" = "{cloud_store_config['DORIS_CLOUD_AK']}", + "s3.secret_key" = "{cloud_store_config['DORIS_CLOUD_SK']}", + "s3.endpoint" = "{cloud_store_config['DORIS_CLOUD_ENDPOINT']}", + "s3.bucket" = "{cloud_store_config['DORIS_CLOUD_BUCKET']}", + "s3.region" = "{cloud_store_config['DORIS_CLOUD_REGION']}", + "s3.root.path" = "{str(uuid.uuid4())}", + "provider" = "{cloud_store_config['DORIS_CLOUD_PROVIDER']}" + ); + """ + self._exec_query(create_vault_sql) + LOG.info("Created storage vault 'default_vault'") + + # Set as default storage vault + set_default_vault_sql = "SET default_vault as DEFAULT STORAGE VAULT;" + self._exec_query(set_default_vault_sql) + LOG.info("Set 'default_vault' as the default storage vault") + + except Exception as e: + LOG.error(f"Failed to create default storage vault: {str(e)}") + raise + def _load_fe_states(self, query_ports): fe_states = {} alive_master_fe_port = None for record in self._exec_query(''' show frontends '''): - # Unpack the record into individual columns - name, ip, editlogPort, httpPort, queryPort, rpcPort, arrowFlightSqlPort, role, is_master, clusterId, join, alive, replayedJournalId, lastStartTime, lastHeartbeat, isHelper, errMsg, version, currentConnected = record + name, ip , edit_log_port, _, query_port, _, _, role, is_master, cluster_id, _, alive, _, _, last_heartbeat, _, err_msg, _, _ = record is_master = utils.is_true(is_master) alive = utils.is_true(alive) id = CLUSTER.Node.get_id_from_ip(ip) query_port = query_ports.get(id, "") last_heartbeat = utils.escape_null(last_heartbeat) fe = FEState(id, query_port, is_master, alive, last_heartbeat, - err_msg) + err_msg, edit_log_port) fe_states[id] = fe if is_master and alive and query_port: alive_master_fe_port = query_port + LOG.info("record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}".format(name, ip, alive, is_master, role)) + self.fe_states = fe_states if alive_master_fe_port and alive_master_fe_port != self.query_port: self.query_port = alive_master_fe_port @@ -166,17 +216,18 @@ def _load_fe_states(self, query_ports): def _load_be_states(self): be_states = {} for record in self._exec_query(''' - select BackendId, Host, LastHeartbeat, Alive, SystemDecommissioned, TabletNum, ErrMsg + select BackendId, Host, LastHeartbeat, Alive, SystemDecommissioned, TabletNum, ErrMsg, HeartbeatPort from backends()'''): - backend_id, ip, last_heartbeat, alive, decommissioned, tablet_num, err_msg = record + backend_id, ip, last_heartbeat, alive, decommissioned, tablet_num, err_msg, heartbeat_port = record backend_id = int(backend_id) alive = utils.is_true(alive) decommissioned = utils.is_true(decommissioned) tablet_num = int(tablet_num) id = CLUSTER.Node.get_id_from_ip(ip) last_heartbeat = utils.escape_null(last_heartbeat) + heartbeat_port = utils.escape_null(heartbeat_port) be = BEState(id, backend_id, decommissioned, alive, tablet_num, - last_heartbeat, err_msg) + last_heartbeat, err_msg, heartbeat_port) be_states[id] = be self.be_states = be_states diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh index 166597aec3087e..d4aad29b0e5cf8 100755 --- a/docker/runtime/doris-compose/resource/init_fe.sh +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -91,6 +91,15 @@ start_cloud_fe() { # Check if SQL_MODE_NODE_MGR is set to 1 if [ "${SQL_MODE_NODE_MGR}" = "1" ]; then health_log "SQL_MODE_NODE_MGR is set to 1. Skipping add FE." + + touch $REGISTER_FILE + + fe_daemon & + bash $DORIS_HOME/bin/start_fe.sh --daemon + + if [ "$MY_ID" == "1" ]; then + echo $MY_IP >$MASTER_FE_IP_FILE + fi return fi diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 8fdcaf425122eb..fc1144fa22268b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2799,6 +2799,9 @@ public class Config extends ConfigBase { @ConfField public static int warn_sys_accumulated_file_size = 2; @ConfField public static int audit_sys_accumulated_file_size = 4; + @ConfField + public static String deploy_mode = ""; + // compatibily with elder version. // cloud_unique_id is introduced before cloud_instance_id, so it has higher priority. @ConfField @@ -2809,7 +2812,7 @@ public class Config extends ConfigBase { public static String cloud_instance_id = ""; public static boolean isCloudMode() { - return !cloud_unique_id.isEmpty() || !cloud_instance_id.isEmpty(); + return deploy_mode.equals("disaggregated") || !cloud_unique_id.isEmpty() || !cloud_instance_id.isEmpty(); } public static boolean isNotCloudMode() { @@ -3055,4 +3058,8 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true, description = {"表示最大锁持有时间,超过该时间会打印告警日志,单位秒", "Maximum lock hold time; logs a warning if exceeded"}) public static long max_lock_hold_threshold_seconds = 10; + + @ConfField(description = {"检查资源就绪的周期,单位秒", + "Interval checking if resource is ready"}) + public static long resource_not_ready_sleep_seconds = 5; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index f07ea1bf5a12ea..c6117dfc5080e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -56,8 +56,6 @@ public class InternalSchemaInitializer extends Thread { - public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 5; - private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); public InternalSchemaInitializer() { @@ -73,17 +71,17 @@ public void run() { FrontendNodeType feType = Env.getCurrentEnv().getFeType(); if (feType.equals(FrontendNodeType.INIT) || feType.equals(FrontendNodeType.UNKNOWN)) { LOG.warn("FE is not ready"); - Thread.sleep(5000); + Thread.sleep(Config.resource_not_ready_sleep_seconds); continue; } Thread.currentThread() - .join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L); + .join(Config.resource_not_ready_sleep_seconds * 1000L); createDb(); createTbl(); } catch (Throwable e) { LOG.warn("Statistics storage initiated failed, will try again later", e); try { - Thread.sleep(5000); + Thread.sleep(Config.resource_not_ready_sleep_seconds); } catch (InterruptedException ex) { LOG.info("Sleep interrupted. {}", ex.getMessage()); } @@ -155,7 +153,7 @@ public static void modifyTblReplicaCount(Database database, String tblName) { } } try { - Thread.sleep(5000); + Thread.sleep(Config.resource_not_ready_sleep_seconds); } catch (InterruptedException t) { // IGNORE } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 0297fc851bbec1..843f58e0818570 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -44,6 +44,7 @@ import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -92,7 +93,10 @@ public CloudUpgradeMgr getCloudUpgradeMgr() { @Override public void initialize(String[] args) throws Exception { - if (Config.cloud_unique_id == null || Config.cloud_unique_id.isEmpty()) { + if (Strings.isNullOrEmpty(Config.cloud_unique_id)) { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new UserException("cloud_instance_id must be specified if deployed in dissaggregated"); + } LOG.info("cloud_unique_id is not set, setting it using instance_id"); Config.cloud_unique_id = "1:" + Config.cloud_instance_id + ":sql_server00"; } @@ -192,16 +196,21 @@ protected void getClusterIdAndRole() throws IOException { try { nodeInfoPB = getLocalTypeFromMetaService(); } catch (Exception e) { - LOG.warn("failed to get local fe's type, sleep 5 s, try again. exception: {}", e.getMessage()); + LOG.warn("failed to get local fe's type, sleep {} s, try again. exception: {}", + e.getMessage(), Config.resource_not_ready_sleep_seconds); } if (nodeInfoPB == null) { - LOG.warn("failed to get local fe's type, sleep 5 s, try again."); + LOG.warn("failed to get local fe's type, sleep {} s, try again.", + Config.resource_not_ready_sleep_seconds); try { try { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new DdlException("unable to create instance due to empty cloud_instance_id"); + } getCloudSystemInfoService().tryCreateInstance(Config.cloud_instance_id, Config.cloud_instance_id, false); } catch (Exception e) { - Thread.sleep(5000); + Thread.sleep(Config.resource_not_ready_sleep_seconds); throw e; } addFrontend(FrontendNodeType.MASTER, selfNode.getHost(), selfNode.getPort()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 47e9a283af3b65..095fde288fb9b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -311,6 +311,9 @@ public synchronized void updateFrontends(List toAdd, List to private void alterBackendCluster(List hostInfos, String clusterName, Cloud.AlterClusterRequest.Operation operation) throws DdlException { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new DdlException("unable to alter backends due to empty cloud_instance_id"); + } // Issue rpc to meta to alter node, then fe master would add this node to its frontends Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() .setClusterName(clusterName) @@ -336,6 +339,7 @@ private void alterBackendCluster(List hostInfos, String clusterName, Cloud.AlterClusterResponse response; try { response = MetaServiceProxy.getInstance().alterCluster(request); + LOG.info("alter cluster, request: {}, response: {}", request, response); if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { LOG.warn("alter backends not ok, response: {}", response); throw new DdlException("failed to alter backends errorCode: " + response.getStatus().getCode() @@ -736,6 +740,10 @@ public Map getCloudClusterNameToId() { // FrontendCluster = SqlServerCluster private void alterFrontendCluster(FrontendNodeType role, String host, int editLogPort, Cloud.AlterClusterRequest.Operation op) throws DdlException { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new DdlException("unable to alter frontend due to empty cloud_instance_id"); + } + // Issue rpc to meta to add this node, then fe master would add this node to its frontends Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder() .setIp(host) @@ -762,6 +770,7 @@ private void alterFrontendCluster(FrontendNodeType role, String host, int editLo Cloud.AlterClusterResponse response; try { response = MetaServiceProxy.getInstance().alterCluster(request); + LOG.info("alter cluster, request: {}, response: {}", request, response); if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { LOG.warn("alter frontend not ok, response: {}", response); throw new DdlException("failed to alter frontend errorCode: " + response.getStatus().getCode() @@ -773,10 +782,6 @@ private void alterFrontendCluster(FrontendNodeType role, String host, int editLo } public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { - if (role != FrontendNodeType.MASTER && role != FrontendNodeType.OBSERVER) { - throw new DdlException("unsupported frontend role: " + role); - } - Cloud.AlterClusterRequest.Operation op; op = role == FrontendNodeType.MASTER ? Cloud.AlterClusterRequest.Operation.ADD_CLUSTER : Cloud.AlterClusterRequest.Operation.ADD_NODE; @@ -788,6 +793,10 @@ public void dropFrontend(FrontendNodeType role, String host, int editLogPort) th } private void tryCreateCluster(String clusterName, String clusterId) throws UserException { + if (Strings.isNullOrEmpty(Config.cloud_instance_id)) { + throw new DdlException("unable to create cluster due to empty cloud_instance_id"); + } + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() .setClusterId(clusterId) .setClusterName(clusterName) @@ -803,6 +812,7 @@ private void tryCreateCluster(String clusterName, String clusterId) throws UserE Cloud.AlterClusterResponse response; try { response = MetaServiceProxy.getInstance().alterCluster(request); + LOG.info("alter cluster, request: {}, response: {}", request, response); if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK && response.getStatus().getCode() != Cloud.MetaServiceCode.ALREADY_EXISTED) { LOG.warn("create cluster not ok, response: {}", response); @@ -929,7 +939,9 @@ public void waitForAutoStart(String clusterName) throws DdlException { Cloud.AlterClusterResponse response; try { - response = MetaServiceProxy.getInstance().alterCluster(builder.build()); + Cloud.AlterClusterRequest request = builder.build(); + response = MetaServiceProxy.getInstance().alterCluster(request); + LOG.info("alter cluster, request: {}, response: {}", request, response); if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { LOG.warn("notify to resume cluster not ok, cluster {}, response: {}", clusterName, response); } @@ -985,7 +997,10 @@ public void tryCreateInstance(String instanceId, String name, boolean sseEnabled Cloud.CreateInstanceResponse response; try { - response = MetaServiceProxy.getInstance().createInstance(builder.build()); + + Cloud.CreateInstanceRequest request = builder.build(); + response = MetaServiceProxy.getInstance().createInstance(request); + LOG.info("create instance, request: {}, response: {}", request, response); if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK && response.getStatus().getCode() != Cloud.MetaServiceCode.ALREADY_EXISTED) { LOG.warn("Failed to create instance {}, response: {}", instanceId, response); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java index 452698e1c18c22..38fee894eb3d99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java @@ -17,7 +17,6 @@ package org.apache.doris.common; -import java.util.UUID; import java.security.SecureRandom; public class RandomIdentifierGenerator { @@ -38,7 +37,7 @@ public static String generateRandomIdentifier(int length) { } StringBuilder sb = new StringBuilder(length); - + // First character must be a letter sb.append(ALPHABET.charAt(RANDOM.nextInt(ALPHABET.length()))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 16e0106b3185b1..bf19bdc37c190c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2688,6 +2688,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx if (info != null) { storageVaultName = info.first; storageVaultId = info.second; + LOG.info("Using default storage vault: name={}, id={}", storageVaultName, storageVaultId); } else { throw new DdlException("No default storage vault." + " You can use `SHOW STORAGE VAULT` to get all available vaults," @@ -2711,9 +2712,12 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx olapTable.setStorageVaultName(storageVaultName); storageVaultId = env.getStorageVaultMgr().getVaultIdByName(storageVaultName); - if (storageVaultId != null && !storageVaultId.isEmpty()) { - olapTable.setStorageVaultId(storageVaultId); + if (Strings.isNullOrEmpty(storageVaultId)) { + throw new DdlException("Storage vault '" + storageVaultName + "' does not exist. " + + "You can use `SHOW STORAGE VAULT` to get all available vaults, " + + "or create a new one with `CREATE STORAGE VAULT`."); } + olapTable.setStorageVaultId(storageVaultId); } // check `update on current_timestamp` diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java index 21abce18adecbd..b88cf84282f9e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java @@ -73,7 +73,6 @@ public class Tag implements Writable { public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT = "cloud_cluster_private_endpoint"; public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status"; - public static final String COMPUTE_GROUP_NAME = "name"; public static final String VALUE_DEFAULT_CLOUD_CLUSTER_NAME = "default_cluster"; public static final String WORKLOAD_GROUP = "workload_group"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index d2d05e97995a58..5dd8dd9fca1ca0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -95,8 +95,14 @@ public void setMaster(int clusterId, String token, long epoch) { tMasterInfo.setHttpPort(Config.http_port); long flags = heartbeatFlags.getHeartbeatFlags(); tMasterInfo.setHeartbeatFlags(flags); - tMasterInfo.setCloudInstanceId(Config.cloud_instance_id); - tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint); + if (Config.isCloudMode()) { + // Set cloud_instance_id and meta_service_endpoint even if there are empty + // Be can knowns that fe is working in cloud mode. + // Set the cloud instance ID for cloud deployment identification + tMasterInfo.setCloudInstanceId(Config.cloud_instance_id); + // Set the endpoint for the metadata service in cloud mode + tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint); + } masterInfo.set(tMasterInfo); } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 7fbac53dc91c1f..3e0bee2ba56830 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -320,8 +320,8 @@ class Suite implements GroovyInterceptable { def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName logger.info("try create database if not exists {}", context.dbName) JdbcUtils.executeToList(conn, sql) - url = Config.buildUrlWithDb(url, context.dbName) + url = Config.buildUrlWithDb(url, context.dbName) logger.info("connect to docker cluster: suite={}, url={}", name, url) connect(user, password, url, actionSupplier) } finally { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 0c911257bdf522..30b602cb8b74f5 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -17,8 +17,9 @@ package org.apache.doris.regression.suite import org.apache.doris.regression.Config -import org.apache.doris.regression.util.Http import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.Http +import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.NodeType import com.google.common.collect.Maps @@ -29,6 +30,7 @@ import groovy.json.JsonSlurper import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import java.util.stream.Collectors +import java.sql.Connection class ClusterOptions { @@ -36,6 +38,8 @@ class ClusterOptions { int beNum = 3 Boolean sqlModeNodeMgr = false + Boolean noBeMetaServiceEndpoint = false + Boolean noBeCloudInstanceId = false int waitTimeout = 180 @@ -154,6 +158,7 @@ class ServerNode { class Frontend extends ServerNode { + int editLogPort int queryPort boolean isMaster @@ -161,6 +166,7 @@ class Frontend extends ServerNode { Frontend fe = new Frontend() ServerNode.fromCompose(fe, header, index, fields) fe.queryPort = (Integer) fields.get(header.indexOf('query_port')) + fe.editLogPort = (Integer) fields.get(header.indexOf('edit_log_port')) fe.isMaster = fields.get(header.indexOf('is_master')) == 'true' return fe } @@ -181,14 +187,17 @@ class Frontend extends ServerNode { class Backend extends ServerNode { + int heartbeatPort long backendId int tabletNum static Backend fromCompose(ListHeader header, int index, List fields) { Backend be = new Backend() ServerNode.fromCompose(be, header, index, fields) + be.heartbeatPort = (Integer) fields.get(header.indexOf('heartbeat_port')) be.backendId = toLongOrDefault(fields.get(header.indexOf('backend_id')), -1L) be.tabletNum = (int) toLongOrDefault(fields.get(header.indexOf('tablet_num')), 0L) + return be } @@ -259,6 +268,7 @@ class SuiteCluster { final String name final Config config private boolean running + private boolean sqlModeNodeMgr = false; SuiteCluster(String name, Config config) { this.name = name @@ -309,8 +319,18 @@ class SuiteCluster { if (options.sqlModeNodeMgr) { cmd += ['--sql-mode-node-mgr'] } + if (options.noBeMetaServiceEndpoint) { + cmd += ['--no-be-metaservice-endpoint'] + } + if (options.noBeCloudInstanceId) { + + cmd += ['--no-be-cloud-instanceid'] + } + cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)] + sqlModeNodeMgr = options.sqlModeNodeMgr; + runCmd(cmd.join(' '), -1) // wait be report disk @@ -413,6 +433,7 @@ class SuiteCluster { def data = runCmd(cmd) assert data instanceof List def rows = (List>) data + logger.info("get all nodes {}", rows); def header = new ListHeader(rows.get(0)) for (int i = 1; i < rows.size(); i++) { def row = (List) rows.get(i) diff --git a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy new file mode 100644 index 00000000000000..539779eab8365f --- /dev/null +++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper + +suite('test_sql_mode_node_mgr', 'p1') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + ] + options.cloudMode = true + options.sqlModeNodeMgr = true + options.waitTimeout = 0 + options.feNum = 3 + options.feConfigs += ["resource_not_ready_sleep_seconds=1", + "heartbeat_interval_second=1",] + options.noBeCloudInstanceId = true; + options.noBeMetaServiceEndpoint = true; + + // + docker(options) { + logger.info("docker started"); + def result = sql """show frontends; """ + logger.info("show frontends result {}", result); + // Extract fields from frontends result + def frontendFields = result.collect { row -> + [ + name: row[0], + ip: row[1], + queryPort: row[4], + role: row[7], + isMaster: row[8], + alive: row[11] + ] + } + + logger.info("Extracted frontend fields: {}", frontendFields) + + // Verify at least one frontend is alive and master + // Verify only one frontend is master + assert frontendFields.count { it.isMaster == "true" } == 1, "Expected exactly one master frontend" + + // Verify three frontends are alive + assert frontendFields.count { it.alive == "true" } == 3, "Expected exactly three alive frontends" + + result = sql """show backends; """ + logger.info("show backends result {}", result); + + // Extract fields from backends result + def backendFields = result.collect { row -> + [ + id: row[0], + ip: row[1], + alive: row[9] + ] + } + + logger.info("Extracted backend fields: {}", backendFields) + + // Verify three backends are alive + assert backendFields.count { it.alive == "true" } == 3, "Expected exactly three alive backends" + + sql """ drop table if exists example_table """ + sql """ CREATE TABLE IF NOT EXISTS example_table ( + id BIGINT, + username VARCHAR(20) + ) + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); """ + sql """ insert into example_table values(1, "1") """ + sql """ select * from example_table """ + + logger.info("Restarting frontends and backends...") + cluster.restartFrontends(); + cluster.restartBackends(); + + sleep(30000) + context.reconnectFe() + + sql """ select * from example_table """ + + sql """ insert into example_table values(1, "1") """ + + sql """ select * from example_table order by id """ + } + + options.noBeCloudInstanceId = false; + options.noBeMetaServiceEndpoint = false; + + // + docker(options) { + logger.info("docker started"); + + // Generate a unique UUID for the root path + def uuid = UUID.randomUUID().toString() + logger.info("Generated UUID for root path: ${uuid}") + // Create a new storage vault + sql """ CREATE STORAGE VAULT test_vault + PROPERTIES ( + "type" = "s3", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.bucket" = "${getS3BucketName()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}", + "s3.root.path" = "${uuid}", + "provider" = "${getS3Provider()}" + ); """ + + // Set the newly created vault as default + sql """ SET test_vault AS DEFAULT STORAGE VAULT; """ + + // Verify the vault was created and set as default + def vaultResult = sql """ SHOW STORAGE VAULT; """ + logger.info("Show storage vault result: {}", vaultResult) + + assert vaultResult.size() > 0, "Expected at least one storage vault" + assert vaultResult.any { row -> + row[0] == "test_vault" && row[5] == "true" + }, "Expected 'test_vault' to be created and set as default" + + def result = sql """show frontends; """ + logger.info("show frontends result {}", result); + // Extract fields from frontends result + def frontendFields = result.collect { row -> + [ + name: row[0], + ip: row[1], + queryPort: row[4], + role: row[7], + isMaster: row[8], + alive: row[11] + ] + } + + logger.info("Extracted frontend fields: {}", frontendFields) + + // Verify at least one frontend is alive and master + // Verify only one frontend is master + assert frontendFields.count { it.isMaster == "true" } == 1, "Expected exactly one master frontend" + + // Verify three frontends are alive + assert frontendFields.count { it.alive == "true" } == 3, "Expected exactly three alive frontends" + + result = sql """show backends; """ + logger.info("show backends result {}", result); + + // Extract fields from backends result + def backendFields = result.collect { row -> + [ + id: row[0], + ip: row[1], + alive: row[9] + ] + } + + logger.info("Extracted backend fields: {}", backendFields) + + // Verify three backends are alive + assert backendFields.count { it.alive == "true" } == 3, "Expected exactly three alive backends" + + sql """ drop table if exists example_table """ + sql """ CREATE TABLE IF NOT EXISTS example_table ( + id BIGINT, + username VARCHAR(20) + ) + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); """ + sql """ insert into example_table values(1, "1") """ + result = sql """ select * from example_table """ + assert result.size() == 1 + + logger.info("Restarting frontends and backends...") + cluster.restartFrontends(); + cluster.restartBackends(); + + sleep(30000) + context.reconnectFe() + + result = sql """ select * from example_table """ + assert result.size() == 1 + + sql """ insert into example_table values(1, "1") """ + + result = sql """ select * from example_table order by id """ + assert result.size() == 2 + + } +} \ No newline at end of file