Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
sabir-akhadov committed May 7, 2024
1 parent 58844ab commit 642eb31
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class RemoveColumnMappingCDCSuite extends RemoveColumnMappingSuiteUtils {

private case object Downgrade extends Operation {
override def runOperation(): Unit = {
disableColumnMapping()
unsetColumnMappingProperty(useUnset = false)
insertMoreRows()
}
}
Expand Down Expand Up @@ -196,10 +196,6 @@ class RemoveColumnMappingCDCSuite extends RemoveColumnMappingSuiteUtils {
}
}

private val firstColumn = "first_column_name"
private val thirdColumn = "third_column_name"
private val renamedThirdColumn = "renamed_third_column_name"

private def createTable(): Unit = {
val columnMappingMode = "none"
sql(s"""CREATE TABLE $testTableName
Expand All @@ -212,33 +208,10 @@ class RemoveColumnMappingCDCSuite extends RemoveColumnMappingSuiteUtils {
|""".stripMargin)
}

private def enableColumnMapping(): Unit = {
sql(
s"""ALTER TABLE $testTableName
SET TBLPROPERTIES (
'${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5')""")
}

private def disableColumnMapping(): Unit = {
unsetColumnMappingProperty(useUnset = false)
}

private def insertMoreRows(): Unit = {
sql(s"INSERT INTO $testTableName SELECT * FROM $testTableName LIMIT $totalRows")
}

private def renameColumn(): Unit = {
sql(s"ALTER TABLE $testTableName RENAME COLUMN $thirdColumn TO $renamedThirdColumn")
}

private def dropColumn(): Unit = {
sql(s"ALTER TABLE $testTableName DROP COLUMN $thirdColumn")
}

private def deltaLog = DeltaLog.forTable(spark, TableIdentifier(testTableName))

private def getCDCAndFailIncompatibleSchemaChange(
startVersion: Long,
endVersion: Option[Long]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import org.apache.spark.sql.functions.col
*/
trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSuiteUtils {

override def beforeAll(): Unit = {
override protected def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(ALLOW_COLUMN_MAPPING_REMOVAL.key, "true")
}

override def afterEach(): Unit = {
override protected def afterEach(): Unit = {
sql(s"DROP TABLE IF EXISTS $testTableName")
super.afterEach()
}
Expand All @@ -50,7 +50,12 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
protected val rowsPerFile = totalRows / numFiles
protected val logicalColumnName = "logical_column_name"
protected val secondColumn = "second_column_name"
protected val firstColumn = "first_column_name"
protected val thirdColumn = "third_column_name"
protected val renamedThirdColumn = "renamed_third_column_name"

protected val testTableName: String = "test_table_" + this.getClass.getSimpleName
protected def deltaLog = DeltaLog.forTable(spark, TableIdentifier(testTableName))

import testImplicits._

Expand Down Expand Up @@ -124,6 +129,23 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
|""".stripMargin)
}

protected def enableColumnMapping(): Unit = {
sql(
s"""ALTER TABLE $testTableName
SET TBLPROPERTIES (
'${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5')""")
}

protected def renameColumn(): Unit = {
sql(s"ALTER TABLE $testTableName RENAME COLUMN $thirdColumn TO $renamedThirdColumn")
}

protected def dropColumn(): Unit = {
sql(s"ALTER TABLE $testTableName DROP COLUMN $thirdColumn")
}

/**
* Get all files in snapshot.
*/
Expand Down

0 comments on commit 642eb31

Please sign in to comment.