From 514a73335af3d323580d6296929b1f3a2c26a42d Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Tue, 10 Sep 2024 17:57:47 +0800 Subject: [PATCH] fix --- docker/runtime/doris-compose/cluster.py | 68 +-- docker/runtime/doris-compose/command.py | 76 ++-- docker/runtime/doris-compose/database.py | 12 +- .../java/org/apache/doris/qe/DdlExecutor.java | 3 +- .../regression/suite/SuiteCluster.groovy | 13 +- .../node_mgr/test_sql_mode_node_mgr.groovy | 407 ++++++++++-------- 6 files changed, 330 insertions(+), 249 deletions(-) diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index 20716b697f73fb..63623170eb8e7d 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -137,6 +137,7 @@ class NodeMeta(object): def __init__(self, image): self.image = image + class Group(object): def __init__(self, node_type): @@ -290,16 +291,27 @@ def docker_env(self): enable_coverage = self.cluster.coverage_dir envs = { - "MY_IP": self.get_ip(), - "MY_ID": self.id, - "MY_TYPE": self.node_type(), - "FE_QUERY_PORT": FE_QUERY_PORT, - "FE_EDITLOG_PORT": FE_EDITLOG_PORT, - "BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT, - "DORIS_HOME": os.path.join(self.docker_home_dir()), - "STOP_GRACE": 1 if enable_coverage else 0, - "IS_CLOUD": 1 if self.cluster.is_cloud else 0, - "SQL_MODE_NODE_MGR": 1 if hasattr(self.cluster, 'sql_mode_node_mgr') and self.cluster.sql_mode_node_mgr else 0 + "MY_IP": + self.get_ip(), + "MY_ID": + self.id, + "MY_TYPE": + self.node_type(), + "FE_QUERY_PORT": + FE_QUERY_PORT, + "FE_EDITLOG_PORT": + FE_EDITLOG_PORT, + "BE_HEARTBEAT_PORT": + BE_HEARTBEAT_PORT, + "DORIS_HOME": + os.path.join(self.docker_home_dir()), + "STOP_GRACE": + 1 if enable_coverage else 0, + "IS_CLOUD": + 1 if self.cluster.is_cloud else 0, + "SQL_MODE_NODE_MGR": + 1 if hasattr(self.cluster, 'sql_mode_node_mgr') + and self.cluster.sql_mode_node_mgr else 0 } if self.cluster.is_cloud: @@ -391,8 +403,7 @@ def get_add_init_config(self): if self.cluster.is_cloud: cfg += [ "meta_service_endpoint = {}".format( - self.cluster.get_meta_server_addr()), - "", + self.cluster.get_meta_server_addr()), "", "# For regression-test", "ignore_unsupported_properties_in_cloud_mode = true", "merge_on_write_forced_to_false = true", @@ -468,19 +479,19 @@ def get_add_init_config(self): "deploy_mode = disaggregated", ] - if self.cluster.is_cloud and not self.cluster.no_be_metaservice_endpoint: - cfg += [ - "meta_service_endpoint = {}".format( - self.cluster.get_meta_server_addr()), - ] - if self.cluster.is_cloud and not self.cluster.no_be_cloud_instanceid: - cfg += [ - "cloud_instance_id = " + self.cloud_instance_id(), - ] - if self.cluster.is_cloud and not self.cluster.sql_mode_node_mgr: - cfg += [ - "cloud_unique_id = " + self.cloud_unique_id(), - ] + if not self.cluster.no_be_metaservice_endpoint: + cfg += [ + "meta_service_endpoint = {}".format( + self.cluster.get_meta_server_addr()), + ] + if not self.cluster.no_be_cloud_instanceid: + cfg += [ + "cloud_instance_id = " + self.cloud_instance_id(), + ] + if not self.cluster.sql_mode_node_mgr: + cfg += [ + "cloud_unique_id = " + self.cloud_unique_id(), + ] return cfg def init_cluster_name(self): @@ -678,6 +689,8 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config, self.no_be_metaservice_endpoint = no_be_metaservice_endpoint self.no_be_cloud_instanceid = no_be_cloud_instanceid + LOG.info("xxxxx cluster") + @staticmethod def new(name, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, reg_be, @@ -694,8 +707,9 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config, cluster = Cluster(name, subnet, image, is_cloud, fe_config, be_config, ms_config, recycle_config, fe_follower, be_disks, be_cluster, reg_be, - coverage_dir, cloud_store_config, sql_mode_node_mgr, - be_metaservice_endpoint, be_cloud_instanceid) + coverage_dir, cloud_store_config, + sql_mode_node_mgr, be_metaservice_endpoint, + be_cloud_instanceid) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index 4f7eb97e47cd26..cf6211f35de6a5 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -302,15 +302,39 @@ def add_parser(self, args_parsers): action=self._get_parser_bool_action(True), help="Manager fe be via sql instead of http") - parser.add_argument("--no-be-metaservice-endpoint", - default=False, - action=self._get_parser_bool_action(True), - help="Do not set BE meta service endpoint in conf. Default is False.") + if self._support_boolean_action(): + parser.add_argument( + "--be-metaservice-endpoint", + default=True, + action=self._get_parser_bool_action(True), + help= + "Do not set BE meta service endpoint in conf. Default is False." + ) + else: + parser.add_argument( + "--no-be-metaservice-endpoint", + dest='be-metaservice-endpoint', + default=True, + action=self._get_parser_bool_action(True), + help= + "Do not set BE meta service endpoint in conf. Default is False." + ) - parser.add_argument("--no-be-cloud-instanceid", - default=True, - action=self._get_parser_bool_action(True), - help="Do not set BE cloud instance ID in conf. Default is False.") + if self._support_boolean_action(): + parser.add_argument( + "--be-cloud-instanceid", + default=True, + action=self._get_parser_bool_action(True), + help= + "Do not set BE cloud instance ID in conf. Default is False.") + else: + parser.add_argument( + "--no-be-cloud-instanceid", + dest='be-cloud-instanceid', + default=True, + action=self._get_parser_bool_action(True), + help= + "Do not set BE cloud instance ID in conf. Default is False.") parser.add_argument( "--fdb-version", @@ -499,7 +523,8 @@ def do_add_node(node_type, add_num, add_ids): "Not up cluster cause specific --no-start, related node num {}" .format(related_node_num))) else: - LOG.info("Using SQL mode for node management ? {}".format(args.sql_mode_node_mgr)); + LOG.info("Using SQL mode for node management ? {}".format( + args.sql_mode_node_mgr)) # Wait for FE master to be elected LOG.info("Waiting for FE master to be elected...") @@ -509,13 +534,14 @@ def do_add_node(node_type, add_num, add_ids): for id in add_fe_ids: fe_state = db_mgr.get_fe(id) if fe_state is not None and fe_state.alive: - break; + break LOG.info("there is no fe ready") time.sleep(5) if cluster.is_cloud and args.sql_mode_node_mgr: db_mgr = database.get_db_mgr(args.NAME, False) - master_fe_endpoint = CLUSTER.get_master_fe_endpoint(cluster.name) + master_fe_endpoint = CLUSTER.get_master_fe_endpoint( + cluster.name) # Add FEs except master_fe for fe in cluster.get_all_nodes(CLUSTER.Node.TYPE_FE): fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}" @@ -524,7 +550,8 @@ def do_add_node(node_type, add_num, add_ids): db_mgr.add_fe(fe_endpoint) LOG.info(f"Added FE {fe_endpoint} successfully.") except Exception as e: - LOG.error(f"Failed to add FE {fe_endpoint}: {str(e)}") + LOG.error( + f"Failed to add FE {fe_endpoint}: {str(e)}") # Add BEs for be in cluster.get_all_nodes(CLUSTER.Node.TYPE_BE): @@ -566,7 +593,6 @@ def do_add_node(node_type, add_num, add_ids): raise Exception(err) time.sleep(1) - LOG.info( utils.render_green( "Up cluster {} succ, related node num {}".format( @@ -789,11 +815,7 @@ def info(self, detail): else: pass result += [ - query_port, - http_port, - node_path, - edit_log_port, - heartbeat_port + query_port, http_port, node_path, edit_log_port, heartbeat_port ] return result @@ -917,6 +939,7 @@ def run(self, args): CLUSTER.Node.TYPE_FE, 1).cloud_unique_id())) print("\nWrite succ: " + regression_conf_custom) + class ListCommand(Command): def add_parser(self, args_parsers): @@ -972,8 +995,7 @@ def parse_cluster_compose_file(cluster_name): if services is None: return COMPOSE_BAD, {} return COMPOSE_GOOD, { - service: - ComposeService( + service: ComposeService( service, list(service_conf["networks"].values())[0] ["ipv4_address"], service_conf["image"]) @@ -1126,11 +1148,12 @@ def get_node_seq(node): return self._handle_data(header, rows) + class GetCloudIniCommand(Command): def add_parser(self, args_parsers): - parser = args_parsers.add_parser( - "get-cloud-ini", help="Get cloud.init") + parser = args_parsers.add_parser("get-cloud-ini", + help="Get cloud.init") parser.add_argument( "NAME", nargs="*", @@ -1138,10 +1161,6 @@ def add_parser(self, args_parsers): "Specify multiple clusters, if specific, show all their containers." ) self._add_parser_output_json(parser) - parser.add_argument("--detail", - default=False, - action=self._get_parser_bool_action(True), - help="Print more detail fields.") def _handle_data(self, header, datas): if utils.is_enable_log(): @@ -1157,9 +1176,7 @@ def _handle_data(self, header, datas): def run(self, args): - header = [ - "key", "value" - ] + header = ["key", "value"] rows = [] @@ -1172,6 +1189,7 @@ def run(self, args): return self._handle_data(header, rows) + ALL_COMMANDS = [ UpCommand("up"), DownCommand("down"), diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py index 38fefed0fb0fa1..bbf6fb4fbeb29c 100644 --- a/docker/runtime/doris-compose/database.py +++ b/docker/runtime/doris-compose/database.py @@ -73,7 +73,7 @@ def load_states(self, query_ports): self._load_fe_states(query_ports) self._load_be_states() - def add_fe(self, fe_endpoint): + def add_fe(self, fe_endpoint): try: sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'" self._exec_query(sql) @@ -162,8 +162,8 @@ def decommission_be(self, be_endpoint): time.sleep(5) def create_default_storage_vault(self, cloud_store_config): - try: - # Create storage vault + try: + # Create storage vault create_vault_sql = f""" CREATE STORAGE VAULT IF NOT EXISTS default_vault PROPERTIES ( @@ -195,7 +195,7 @@ def _load_fe_states(self, query_ports): for record in self._exec_query(''' show frontends '''): # Unpack the record into individual columns - name, ip , edit_log_port, _, query_port, _, _, role, is_master, cluster_id, _, alive, _, _, last_heartbeat, _, err_msg, _, _ = record + name, ip, edit_log_port, _, query_port, _, _, role, is_master, cluster_id, _, alive, _, _, last_heartbeat, _, err_msg, _, _ = record is_master = utils.is_true(is_master) alive = utils.is_true(alive) id = CLUSTER.Node.get_id_from_ip(ip) @@ -206,7 +206,9 @@ def _load_fe_states(self, query_ports): fe_states[id] = fe if is_master and alive and query_port: alive_master_fe_port = query_port - LOG.info("record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}".format(name, ip, alive, is_master, role)) + LOG.info( + "record of show frontends, name {}, ip {}, alive {}, is_master {}, role {}" + .format(name, ip, alive, is_master, role)) self.fe_states = fe_states if alive_master_fe_port and alive_master_fe_port != self.query_port: diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index baf31653eff470..8bdbc57f9993b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -564,7 +564,8 @@ private static void checkDdlStmtSupported(DdlStmt ddlStmt) throws DdlException { || ddlStmt instanceof AdminRebalanceDiskStmt || ddlStmt instanceof AdminCancelRebalanceDiskStmt || ddlStmt instanceof AlterResourceStmt - || ddlStmt instanceof AlterPolicyStmt) { + || ddlStmt instanceof AlterPolicyStmt + || ddlStmt instanceof CancelAlterSystemStmt) { LOG.info("stmt={}, not supported in cloud mode", ddlStmt.toString()); throw new DdlException("Unsupported operation"); } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 30b602cb8b74f5..4b8ad69358c34d 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -38,8 +38,8 @@ class ClusterOptions { int beNum = 3 Boolean sqlModeNodeMgr = false - Boolean noBeMetaServiceEndpoint = false - Boolean noBeCloudInstanceId = false + Boolean beMetaServiceEndpoint = false + Boolean beCloudInstanceId = false int waitTimeout = 180 @@ -315,16 +315,15 @@ class SuiteCluster { cmd += ['--fe-follower'] } - cmd += ['--wait-timeout', String.valueOf(180)] if (options.sqlModeNodeMgr) { cmd += ['--sql-mode-node-mgr'] } - if (options.noBeMetaServiceEndpoint) { - cmd += ['--no-be-metaservice-endpoint'] + if (options.beMetaServiceEndpoint) { + cmd += ['--be-metaservice-endpoint'] } - if (options.noBeCloudInstanceId) { + if (options.beCloudInstanceId) { - cmd += ['--no-be-cloud-instanceid'] + cmd += ['--be-cloud-instanceid'] } cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)] diff --git a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy index 539779eab8365f..56129c6a62306c 100644 --- a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy +++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy @@ -22,190 +22,237 @@ suite('test_sql_mode_node_mgr', 'p1') { if (!isCloudMode()) { return; } - def options = new ClusterOptions() - options.feConfigs += [ - 'cloud_cluster_check_interval_second=1', - ] - options.cloudMode = true - options.sqlModeNodeMgr = true - options.waitTimeout = 0 - options.feNum = 3 - options.feConfigs += ["resource_not_ready_sleep_seconds=1", - "heartbeat_interval_second=1",] - options.noBeCloudInstanceId = true; - options.noBeMetaServiceEndpoint = true; - - // - docker(options) { - logger.info("docker started"); - def result = sql """show frontends; """ - logger.info("show frontends result {}", result); - // Extract fields from frontends result - def frontendFields = result.collect { row -> - [ - name: row[0], - ip: row[1], - queryPort: row[4], - role: row[7], - isMaster: row[8], - alive: row[11] - ] - } - - logger.info("Extracted frontend fields: {}", frontendFields) - - // Verify at least one frontend is alive and master - // Verify only one frontend is master - assert frontendFields.count { it.isMaster == "true" } == 1, "Expected exactly one master frontend" - - // Verify three frontends are alive - assert frontendFields.count { it.alive == "true" } == 3, "Expected exactly three alive frontends" - - result = sql """show backends; """ - logger.info("show backends result {}", result); - - // Extract fields from backends result - def backendFields = result.collect { row -> - [ - id: row[0], - ip: row[1], - alive: row[9] - ] - } - logger.info("Extracted backend fields: {}", backendFields) - - // Verify three backends are alive - assert backendFields.count { it.alive == "true" } == 3, "Expected exactly three alive backends" - - sql """ drop table if exists example_table """ - sql """ CREATE TABLE IF NOT EXISTS example_table ( - id BIGINT, - username VARCHAR(20) - ) - DISTRIBUTED BY HASH(id) BUCKETS 2 - PROPERTIES ( - "replication_num" = "1" - ); """ - sql """ insert into example_table values(1, "1") """ - sql """ select * from example_table """ - - logger.info("Restarting frontends and backends...") - cluster.restartFrontends(); - cluster.restartBackends(); - - sleep(30000) - context.reconnectFe() - - sql """ select * from example_table """ - - sql """ insert into example_table values(1, "1") """ + def clusterOptions = [ + new ClusterOptions(), + new ClusterOptions(), + ] - sql """ select * from example_table order by id """ + for (options in clusterOptions) { + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + ] + options.cloudMode = true + options.sqlModeNodeMgr = true + options.waitTimeout = 0 + options.feNum = 3 + options.feConfigs += ["resource_not_ready_sleep_seconds=1", + "heartbeat_interval_second=1",] } - options.noBeCloudInstanceId = false; - options.noBeMetaServiceEndpoint = false; - - // - docker(options) { - logger.info("docker started"); - - // Generate a unique UUID for the root path - def uuid = UUID.randomUUID().toString() - logger.info("Generated UUID for root path: ${uuid}") - // Create a new storage vault - sql """ CREATE STORAGE VAULT test_vault - PROPERTIES ( - "type" = "s3", - "s3.endpoint" = "${getS3Endpoint()}", - "s3.region" = "${getS3Region()}", - "s3.bucket" = "${getS3BucketName()}", - "s3.access_key" = "${getS3AK()}", - "s3.secret_key" = "${getS3SK()}", - "s3.root.path" = "${uuid}", - "provider" = "${getS3Provider()}" - ); """ - - // Set the newly created vault as default - sql """ SET test_vault AS DEFAULT STORAGE VAULT; """ - - // Verify the vault was created and set as default - def vaultResult = sql """ SHOW STORAGE VAULT; """ - logger.info("Show storage vault result: {}", vaultResult) - - assert vaultResult.size() > 0, "Expected at least one storage vault" - assert vaultResult.any { row -> - row[0] == "test_vault" && row[5] == "true" - }, "Expected 'test_vault' to be created and set as default" - - def result = sql """show frontends; """ - logger.info("show frontends result {}", result); - // Extract fields from frontends result - def frontendFields = result.collect { row -> - [ - name: row[0], - ip: row[1], - queryPort: row[4], - role: row[7], - isMaster: row[8], - alive: row[11] - ] + options[0].beCloudInstanceId = true; + options[0].beMetaServiceEndpoint = true; + + options[1].beCloudInstanceId = false; + options[1].beMetaServiceEndpoint = false; + + for (options in clusterOptions) { + docker(options) { + logger.info("docker started"); + + def checkFrontendsAndBackends() { + // Check frontends + def frontendResult = sql_return_maparray """show frontends;""" + logger.info("show frontends result {}", frontendResult) + // Check that we have the expected number of frontends + assert frontendResult.size() == 3, "Expected 3 frontends, but got ${frontendResult.size()}" + + // Check that all required columns are present + def requiredColumns = ['Name', 'IP', 'EditLogPort', 'HttpPort', 'QueryPort', 'RpcPort', 'Role', 'IsMaster', 'ClusterId', 'Join', 'Alive', 'ReplayedJournalId', 'LastHeartbeat', 'IsHelper', 'ErrMsg'] + def actualColumns = frontendResult[0].keySet() + assert actualColumns.containsAll(requiredColumns), "Missing required columns. Expected: ${requiredColumns}, Actual: ${actualColumns}" + + // Check that we have one master and two followers + def masterCount = frontendResult.count { it['IsMaster'] == 'true' } + assert masterCount == 1, "Expected 1 master, but got ${masterCount}" + + def followerCount = frontendResult.count { it['IsMaster'] == 'false' } + assert followerCount == 2, "Expected 2 followers, but got ${followerCount}" + + // Check that all frontends are alive + def aliveCount = frontendResult.count { it['Alive'] == 'true' } + assert aliveCount == 3, "Expected all 3 frontends to be alive, but only ${aliveCount} are alive" + + // Check backends + def backendResult = sql_return_maparray """show backends;""" + logger.info("show backends result {}", backendResult) + // Check that we have the expected number of backends + assert backendResult.size() == 3, "Expected 3 backends, but got ${backendResult.size()}" + + // Check that all required columns are present + def requiredBackendColumns = ['BackendId', 'Cluster', 'IP', 'HeartbeatPort', 'BePort', 'HttpPort', 'BrpcPort', 'LastStartTime', 'LastHeartbeat', 'Alive', 'SystemDecommissioned', 'ClusterDecommissioned', 'TabletNum', 'DataUsedCapacity', 'AvailCapacity', 'TotalCapacity', 'UsedPct', 'MaxDiskUsedPct', 'Tag', 'ErrMsg', 'Version', 'Status'] + def actualBackendColumns = backendResult[0].keySet() + assert actualBackendColumns.containsAll(requiredBackendColumns), "Missing required backend columns. Expected: ${requiredBackendColumns}, Actual: ${actualBackendColumns}" + + // Check that all backends are alive + def aliveBackendCount = backendResult.count { it['Alive'] == 'true' } + assert aliveBackendCount == 3, "Expected all 3 backends to be alive, but only ${aliveBackendCount} are alive" + + // Check that no backends are decommissioned + def decommissionedCount = backendResult.count { it['SystemDecommissioned'] == 'true' || it['ClusterDecommissioned'] == 'true' } + assert decommissionedCount == 0, "Expected no decommissioned backends, but found ${decommissionedCount}" + + // Check that all backends have valid capacities + backendResult.each { backend -> + assert backend['DataUsedCapacity'] != null && backend['AvailCapacity'] != null && backend['TotalCapacity'] != null, "Backend ${backend['BackendId']} has invalid capacity values" + assert backend['UsedPct'] != null && backend['MaxDiskUsedPct'] != null, "Backend ${backend['BackendId']} has invalid disk usage percentages" + } + + logger.info("All backend checks passed successfully") + } + + // Call the function to check frontends and backends + checkFrontendsAndBackends() + + // 2. check read and write work. + sql """ drop table if exists example_table """ + sql """ CREATE TABLE IF NOT EXISTS example_table ( + id BIGINT, + username VARCHAR(20) + ) + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); """ + def result = sql """ insert into example_table values(1, "1") """ + result = sql """ select * from example_table """ + assert result.size() == 1 + + // 3. check restarting fe and be work. + logger.info("Restarting frontends and backends...") + cluster.restartFrontends(); + cluster.restartBackends(); + + sleep(30000) + context.reconnectFe() + + result = sql """ select * from example_table """ + assert result.size() == 1 + + sql """ insert into example_table values(1, "1") """ + + result = sql """ select * from example_table order by id """ + assert result.size() == 2 + + // 4. If a be is dropped, query and writing also work. + // Get the list of backends + def backends = sql_return_maparray("SHOW BACKENDS") + logger.info("Current backends: {}", backends) + + // Find a backend to drop + def backendToDrop = backends[0] + def backendHost = backendToDrop['Host'] + def backendHeartbeatPort = backendToDrop['HeartbeatPort'] + + logger.info("Dropping backend: {}:{}", backendHost, backendHeartbeatPort) + + // Drop the selected backend + sql """ ALTER SYSTEM DECOMMISSION BACKEND "${backendHost}:${backendHeartbeatPort}"; """ + + // Wait for the backend to be fully dropped + int maxWaitSeconds = 300 + int waited = 0 + while (waited < maxWaitSeconds) { + def currentBackends = sql_return_maparray("SHOW BACKENDS") + if (currentBackends.size() == 2) { + logger.info("Backend successfully dropped") + break + } + sleep(10000) + waited += 10 + } + + if (waited >= maxWaitSeconds) { + throw new Exception("Timeout waiting for backend to be dropped") + } + + // Verify the backend was dropped + def remainingBackends = sql_return_maparray("SHOW BACKENDS") + logger.info("Remaining backends: {}", remainingBackends) + assert remainingBackends.size() == 2, "Expected 2 remaining backends" + + // Verify that write operations still work after dropping a backend + sql """ INSERT INTO example_table VALUES (2, '2'); """ + + // Verify that read operations still work after dropping a backend + result = sql """ SELECT * FROM example_table ORDER BY id; """ + logger.info("Query result after dropping backend: {}", result) + assert result.size() == 3, "Expected 3 rows in example_table after dropping backend" + + + // 5. If a fe is dropped, query and writing also work. + // Get the list of frontends + def frontends = sql_return_maparray("SHOW FRONTENDS") + logger.info("Current frontends: {}", frontends) + + // Find a non-master frontend to drop + def feToDropMap = frontends.find { it['IsMaster'] == "false" } + assert feToDropMap != null, "No non-master frontend found to drop" + + def feHost = feToDropMap['Host'] + def feEditLogPort = feToDropMap['EditLogPort'] + + logger.info("Dropping non-master frontend: {}:{}", feHost, feEditLogPort) + + // Drop the selected non-master frontend + sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}"; """ + + // Wait for the frontend to be fully dropped + int maxWaitSeconds = 300 + int waited = 0 + while (waited < maxWaitSeconds) { + def currentFrontends = sql_return_maparray("SHOW FRONTENDS") + if (currentFrontends.size() == frontends.size() - 1) { + logger.info("Non-master frontend successfully dropped") + break + } + sleep(10000) + waited += 10 + } + + if (waited >= maxWaitSeconds) { + throw new Exception("Timeout waiting for non-master frontend to be dropped") + } + + // Verify the frontend was dropped + def remainingFrontends = sql_return_maparray("SHOW FRONTENDS") + logger.info("Remaining frontends: {}", remainingFrontends) + assert remainingFrontends.size() == frontends.size() - 1, "Expected ${frontends.size() - 1} remaining frontends" + + // Verify that write operations still work after dropping a frontend + sql """ INSERT INTO example_table VALUES (3, '3'); """ + + // Verify that read operations still work after dropping a frontend + result = sql """ SELECT * FROM example_table ORDER BY id; """ + logger.info("Query result after dropping frontend: {}", result) + assert result.size() == 4, "Expected 4 rows in example_table after dropping frontend" + + // 6. If fe can not drop itself. + // 6. Attempt to drop the master FE and expect an exception + logger.info("Attempting to drop the master frontend") + + // Get the master frontend information + def masterFE = frontends.find { it['IsMaster'] == "true" } + assert masterFE != null, "No master frontend found" + + def masterHost = masterFE['Host'] + def masterEditLogPort = masterFE['EditLogPort'] + + logger.info("Attempting to drop master frontend: {}:{}", masterHost, masterEditLogPort) + + try { + sql """ ALTER SYSTEM DROP FOLLOWER "${masterHost}:${masterEditLogPort}"; """ + throw new Exception("Expected an exception when trying to drop master frontend, but no exception was thrown") + } catch (Exception e) { + logger.info("Received expected exception when trying to drop master frontend: {}", e.getMessage()) + assert e.getMessage().contains("Cannot drop master"), "Unexpected exception message when trying to drop master frontend" + } + + // Verify that the master frontend is still present + def currentFrontends = sql_return_maparray("SHOW FRONTENDS") + assert currentFrontends.find { it['IsMaster'] == "true" && it['Host'] == masterHost && it['EditLogPort'] == masterEditLogPort } != null, "Master frontend should still be present" + + logger.info("Successfully verified that the master frontend cannot be dropped") } - - logger.info("Extracted frontend fields: {}", frontendFields) - - // Verify at least one frontend is alive and master - // Verify only one frontend is master - assert frontendFields.count { it.isMaster == "true" } == 1, "Expected exactly one master frontend" - - // Verify three frontends are alive - assert frontendFields.count { it.alive == "true" } == 3, "Expected exactly three alive frontends" - - result = sql """show backends; """ - logger.info("show backends result {}", result); - - // Extract fields from backends result - def backendFields = result.collect { row -> - [ - id: row[0], - ip: row[1], - alive: row[9] - ] - } - - logger.info("Extracted backend fields: {}", backendFields) - - // Verify three backends are alive - assert backendFields.count { it.alive == "true" } == 3, "Expected exactly three alive backends" - - sql """ drop table if exists example_table """ - sql """ CREATE TABLE IF NOT EXISTS example_table ( - id BIGINT, - username VARCHAR(20) - ) - DISTRIBUTED BY HASH(id) BUCKETS 2 - PROPERTIES ( - "replication_num" = "1" - ); """ - sql """ insert into example_table values(1, "1") """ - result = sql """ select * from example_table """ - assert result.size() == 1 - - logger.info("Restarting frontends and backends...") - cluster.restartFrontends(); - cluster.restartBackends(); - - sleep(30000) - context.reconnectFe() - - result = sql """ select * from example_table """ - assert result.size() == 1 - - sql """ insert into example_table values(1, "1") """ - - result = sql """ select * from example_table order by id """ - assert result.size() == 2 - } } \ No newline at end of file