Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Apr 22, 2024
1 parent e5d8864 commit a93c8aa
Showing 1 changed file with 183 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.spark.sql.internal.SQLConf
import com.fasterxml.jackson.databind.node.ObjectNode
import io.delta.tables.DeltaTable
import org.apache.commons.io.FileUtils
Expand All @@ -45,6 +47,24 @@ class DeletionVectorsSuite extends QueryTest
with DeltaTestUtilsForTempViews {
import testImplicits._

// ~200MBs. Should contain 2 row groups.
val multiRowgroupTable = "multiRowgroupTable"
val multiRowgroupTableRowsNum = 50000000

override def beforeAll(): Unit = {
super.beforeAll()
spark.range(0, multiRowgroupTableRowsNum, 1, 1).toDF("id")
.write
.option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, true.toString)
.format("delta")
.saveAsTable(multiRowgroupTable)
}

override def afterAll(): Unit = {
super.afterAll()
sql(s"DROP TABLE IF EXISTS $multiRowgroupTable")
}

test(s"read Delta table with deletion vectors") {
def verifyVersion(version: Int, expectedData: Seq[Int]): Unit = {
checkAnswer(
Expand Down Expand Up @@ -709,6 +729,169 @@ class DeletionVectorsSuite extends QueryTest
}
}

private def testPredicatePushDown(
deletePredicates: Seq[String],
selectPredicate: Option[String],
expectedNumRows: Long,
validationPredicate: String,
vectorizedReaderEnabled: Boolean): Unit = {
withTempDir { dir =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReaderEnabled.toString) {
sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` SHALLOW CLONE $multiRowgroupTable")

val targetTable = io.delta.tables.DeltaTable.forPath(dir.getCanonicalPath)

// Execute multiple delete statements.
deletePredicates.foreach(targetTable.delete)

val targetTableDF = selectPredicate.map(targetTable.toDF.filter).getOrElse(targetTable.toDF)
assertPredicatesArePushedDown(targetTableDF)
// Make sure there are splits.
assert(targetTableDF.rdd.partitions.size > 1)

withTempDir { resultDir =>
// Write results to a table without DVs for validation.
targetTableDF
.write
.option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, false.toString)
.format("delta")
.save(resultDir.getCanonicalPath)

val resultsTable = io.delta.tables.DeltaTable.forPath(resultDir.getCanonicalPath)

assert(resultsTable.toDF.count() === expectedNumRows)
assert(resultsTable.toDF.filter(validationPredicate).count() === 0)
}
}
}
}

for (vectorizedReaderEnabled <- BOOLEAN_DOMAIN)
test("PredicatePushdown: Single deletion at the first row group. " +
s"vectorizedReaderEnabled: $vectorizedReaderEnabled") {
testPredicatePushDown(
deletePredicates = Seq("id == 100"),
selectPredicate = None,
expectedNumRows = multiRowgroupTableRowsNum - 1,
validationPredicate = "id == 100",
vectorizedReaderEnabled = vectorizedReaderEnabled)
}

for (vectorizedReaderEnabled <- BOOLEAN_DOMAIN)
test("PredicatePushdown: Single deletion at the second row group. " +
s"vectorizedReaderEnabled: $vectorizedReaderEnabled") {
testPredicatePushDown(
deletePredicates = Seq("id == 40000000"),
selectPredicate = None,
expectedNumRows = multiRowgroupTableRowsNum - 1,
// (rowId, Expected value).
validationPredicate = "id == 40000000",
vectorizedReaderEnabled = vectorizedReaderEnabled)
}

for (vectorizedReaderEnabled <- BOOLEAN_DOMAIN)
test("PredicatePushdown: Single delete statement with multiple ids. " +
s"vectorizedReaderEnabled: $vectorizedReaderEnabled") {
testPredicatePushDown(
deletePredicates = Seq("id in (200, 2000, 20000, 20000000, 40000000)"),
selectPredicate = None,
expectedNumRows = multiRowgroupTableRowsNum - 5,
validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)",
vectorizedReaderEnabled = vectorizedReaderEnabled)
}

for (vectorizedReaderEnabled <- BOOLEAN_DOMAIN)
test("PredicatePushdown: Multiple delete statements. " +
s"vectorizedReaderEnabled: $vectorizedReaderEnabled") {
testPredicatePushDown(
deletePredicates =
Seq("id = 200", "id = 2000", "id = 20000", "id = 20000000", "id = 40000000"),
selectPredicate = None,
expectedNumRows = multiRowgroupTableRowsNum - 5,
validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)",
vectorizedReaderEnabled = vectorizedReaderEnabled)
}

for (vectorizedReaderEnabled <- BOOLEAN_DOMAIN)
test("PredicatePushdown: Scan with predicates. " +
s"vectorizedReaderEnabled: $vectorizedReaderEnabled") {
testPredicatePushDown(
deletePredicates = Seq("id = 200", "id = 2000", "id = 40000000"),
selectPredicate = Some("id not in (20000, 20000000)"),
expectedNumRows = multiRowgroupTableRowsNum - 5,
validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)",
vectorizedReaderEnabled = vectorizedReaderEnabled)
}

for (vectorizedReaderEnabled <- BOOLEAN_DOMAIN)
test("PredicatePushdown: Scan with predicates - no deletes. " +
s"vectorizedReaderEnabled: $vectorizedReaderEnabled") {
testPredicatePushDown(
deletePredicates = Seq.empty,
selectPredicate = Some("id not in (200, 2000, 20000, 20000000, 40000000)"),
expectedNumRows = multiRowgroupTableRowsNum - 5,
validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)",
vectorizedReaderEnabled = vectorizedReaderEnabled)
}

test("Predicate pushdown works on queries that select metadata fields") {
withTempDir { dir =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString) {
sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` SHALLOW CLONE $multiRowgroupTable")

val targetTable = io.delta.tables.DeltaTable.forPath(dir.getCanonicalPath)
targetTable.delete("id == 40000000")

val r1 = targetTable.toDF.select("id", "_metadata.row_index").count()
assert(r1 === multiRowgroupTableRowsNum - 1)

val r2 = targetTable.toDF.select("id", "_metadata.row_index", "_metadata.file_path").count()
assert(r2 === multiRowgroupTableRowsNum - 1)

val r3 = targetTable
.toDF
.select("id", "_metadata.file_block_start", "_metadata.file_path").count()
assert(r3 === multiRowgroupTableRowsNum - 1)
}
}
}


test("TEST 55") {
withTempDir { dir =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString) {
sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` SHALLOW CLONE $multiRowgroupTable")

val targetTable = io.delta.tables.DeltaTable.forPath(dir.getCanonicalPath)

targetTable.delete("id == 40000000")

val targetTableDF = targetTable.toDF.filter("id != 1")
assertPredicatesArePushedDown(targetTableDF)
// Make sure there are splits.
assert(targetTableDF.rdd.partitions.size > 1)

/*
val a = targetTableDF.as[Long].collect()
assert(a.length === 49999998)
assert(a(1) === 2)
assert(a(39999998) === 39999999)
assert(a(39999999) === 40000001)
*/

}
}
}

private def assertPredicatesArePushedDown(df: DataFrame): Unit = {
val scan = df.queryExecution.executedPlan.collectFirst {
case scan: FileSourceScanExec => scan
}
assert(scan.map(_.dataFilters.nonEmpty).getOrElse(true))
}

private sealed case class DeleteUsingDVWithResults(
scale: String,
sqlRule: String,
Expand Down

0 comments on commit a93c8aa

Please sign in to comment.