From 14b63a0a2beff8358d1cb55a487aba55a4007ccc Mon Sep 17 00:00:00 2001 From: Lei Zhang <27994433+SWJTU-ZhangLei@users.noreply.github.com> Date: Tue, 30 May 2023 17:59:35 +0800 Subject: [PATCH] [feature](regression) Add p2 level test for schema change --- .../test_schema_change.groovy | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 regression-test/suites/schema_change_p2/test_schema_change.groovy diff --git a/regression-test/suites/schema_change_p2/test_schema_change.groovy b/regression-test/suites/schema_change_p2/test_schema_change.groovy new file mode 100644 index 000000000000000..ab7c7dc10a085fc --- /dev/null +++ b/regression-test/suites/schema_change_p2/test_schema_change.groovy @@ -0,0 +1,98 @@ +suite("test_schema_change") { + + // create table + def tableName = 'test_schema_change' + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = tableName + "_" + uniqueID + + sql """ DROP TABLE IF EXISTS ${tableName} FORCE""" + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` ( + C_CUSTKEY INTEGER NOT NULL, + C_NAME VARCHAR(25) NOT NULL, + C_ADDRESS VARCHAR(40) NOT NULL, + C_NATIONKEY INTEGER NOT NULL, + C_PHONE CHAR(15) NOT NULL, + C_ACCTBAL DECIMAL(15,2) NOT NULL, + C_MKTSEGMENT CHAR(10) NOT NULL, + C_COMMENT VARCHAR(117) NOT NULL + ) + DUPLICATE KEY(C_CUSTKEY, C_NAME) + DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 32 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + LOAD LABEL ${loadLabel} + ( + DATA INFILE('s3://${s3BucketName}/regression/tpch/sf100/customer.tbl') + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "|" + (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, temp) + ) + WITH S3 + ( + 'AWS_REGION' = '${getS3Region()}', + 'AWS_ENDPOINT' = '${getS3Endpoint()}', + 'AWS_ACCESS_KEY' = '${getS3AK()}', + 'AWS_SECRET_KEY' = '${getS3SK()}' + ) + PROPERTIES + ( + 'exec_mem_limit' = '8589934592', + 'load_parallelism' = '1', + 'timeout' = '3600' + ) + """ + + waitBrokerLoadJob = { String label /* param */ -> + // check load state + int tryTimes = 20 + while (tryTimes-- > 0) { + def stateResult = sql "show load where Label = '${label}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ('cancelled'.equalsIgnoreCase(loadState)) { + logger.info("stateResult:{}", stateResult) + throw new IllegalStateException("load ${label} has been cancelled") + } else if ('finished'.equalsIgnoreCase(loadState)) { + logger.info("stateResult:{}", stateResult) + break + } + sleep(60000) + } + } + + waitBrokerLoadJob(loadLabel) + rowCount = sql "select count(*) from ${tableName}" + logger.info("rowCount:{}", rowCount) + assertEquals(rowCount[0][0], 15000000) + + sql """ alter table ${tableName} drop column C_NAME""" + + waitSchemaChangeJob = { String tableName /* param */ -> + int tryTimes = 20 + while (tryTimes-- > 0) { + def jobResult = sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + def jobState = jobResult[0][9].toString() + if ('cancelled'.equalsIgnoreCase(jobState)) { + logger.info("jobResult:{}", jobResult) + throw new IllegalStateException("${tableName}'s job has been cancelled") + } + if ('finished'.equalsIgnoreCase(jobState)) { + logger.info("jobResult:{}", jobResult) + break + } + sleep(60000) + } + } + + waitSchemaChangeJob(tableName) + sql """ DESC ${tableName}""" + rowCount = sql "select count(*) from ${tableName}" + logger.info("rowCount:{}", rowCount) + assertEquals(rowCount[0][0], 15000000) + qt_sql "select count(*) from ${tableName} order by C_CUSTKEY limit 4097;" + sql """ DROP TABLE IF EXISTS ${tableName} FORCE""" +}