Skip to content

Commit

Permalink
[feature](regression) Add p2 level test for schema change
Browse files Browse the repository at this point in the history
  • Loading branch information
SWJTU-ZhangLei authored and dataroaring committed Sep 3, 2023
1 parent 360f43f commit 14b63a0
Showing 1 changed file with 98 additions and 0 deletions.
98 changes: 98 additions & 0 deletions regression-test/suites/schema_change_p2/test_schema_change.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
suite("test_schema_change") {

// create table
def tableName = 'test_schema_change'
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
def loadLabel = tableName + "_" + uniqueID

sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
sql """
CREATE TABLE IF NOT EXISTS `${tableName}` (
C_CUSTKEY INTEGER NOT NULL,
C_NAME VARCHAR(25) NOT NULL,
C_ADDRESS VARCHAR(40) NOT NULL,
C_NATIONKEY INTEGER NOT NULL,
C_PHONE CHAR(15) NOT NULL,
C_ACCTBAL DECIMAL(15,2) NOT NULL,
C_MKTSEGMENT CHAR(10) NOT NULL,
C_COMMENT VARCHAR(117) NOT NULL
)
DUPLICATE KEY(C_CUSTKEY, C_NAME)
DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 32
PROPERTIES (
"replication_num" = "1"
);
"""

sql """
LOAD LABEL ${loadLabel}
(
DATA INFILE('s3://${s3BucketName}/regression/tpch/sf100/customer.tbl')
INTO TABLE ${tableName}
COLUMNS TERMINATED BY "|"
(c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, temp)
)
WITH S3
(
'AWS_REGION' = '${getS3Region()}',
'AWS_ENDPOINT' = '${getS3Endpoint()}',
'AWS_ACCESS_KEY' = '${getS3AK()}',
'AWS_SECRET_KEY' = '${getS3SK()}'
)
PROPERTIES
(
'exec_mem_limit' = '8589934592',
'load_parallelism' = '1',
'timeout' = '3600'
)
"""

waitBrokerLoadJob = { String label /* param */ ->
// check load state
int tryTimes = 20
while (tryTimes-- > 0) {
def stateResult = sql "show load where Label = '${label}'"
def loadState = stateResult[stateResult.size() - 1][2].toString()
if ('cancelled'.equalsIgnoreCase(loadState)) {
logger.info("stateResult:{}", stateResult)
throw new IllegalStateException("load ${label} has been cancelled")
} else if ('finished'.equalsIgnoreCase(loadState)) {
logger.info("stateResult:{}", stateResult)
break
}
sleep(60000)
}
}

waitBrokerLoadJob(loadLabel)
rowCount = sql "select count(*) from ${tableName}"
logger.info("rowCount:{}", rowCount)
assertEquals(rowCount[0][0], 15000000)

sql """ alter table ${tableName} drop column C_NAME"""

waitSchemaChangeJob = { String tableName /* param */ ->
int tryTimes = 20
while (tryTimes-- > 0) {
def jobResult = sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
def jobState = jobResult[0][9].toString()
if ('cancelled'.equalsIgnoreCase(jobState)) {
logger.info("jobResult:{}", jobResult)
throw new IllegalStateException("${tableName}'s job has been cancelled")
}
if ('finished'.equalsIgnoreCase(jobState)) {
logger.info("jobResult:{}", jobResult)
break
}
sleep(60000)
}
}

waitSchemaChangeJob(tableName)
sql """ DESC ${tableName}"""
rowCount = sql "select count(*) from ${tableName}"
logger.info("rowCount:{}", rowCount)
assertEquals(rowCount[0][0], 15000000)
qt_sql "select count(*) from ${tableName} order by C_CUSTKEY limit 4097;"
sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
}

0 comments on commit 14b63a0

Please sign in to comment.