Skip to content

Commit

Permalink
v1
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Jun 12, 2024
1 parent 5106d27 commit d678bae
Showing 1 changed file with 60 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ import org.apache.spark.sql.internal.SQLConf
import com.fasterxml.jackson.databind.node.ObjectNode
import io.delta.tables.DeltaTable
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Subquery}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -885,18 +889,43 @@ object DeletionVectorsSuite {
}

class DeletionVectorsWithPredicatePushdownSuite extends DeletionVectorsSuite {
// ~200MBs. Should contain 2 row groups.
// ~4MBs. Should contain 2 row groups.
val multiRowgroupTable = "multiRowgroupTable"
val multiRowgroupTableRowsNum = 50000000
val multiRowgroupTableRowsNum = 1000000

def hadoopConf(): Configuration = {
// scalastyle:off hadoopconfiguration
// This is to generate a Parquet file with two row groups
spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
}

def assertParquetHasMultipleRowGroups(filePath: Path): Unit = {
val parquetMetadata = ParquetFileReader.readFooter(
hadoopConf,
filePath,
ParquetMetadataConverter.NO_FILTER)
assert(parquetMetadata.getBlocks.size() > 1)
}

override def beforeAll(): Unit = {
super.beforeAll()

// 2MB rowgroups.
hadoopConf().set("parquet.block.size", (2 * 1024 * 1024).toString)

spark.range(0, multiRowgroupTableRowsNum, 1, 1).toDF("id")
.write
.option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, true.toString)
.format("delta")
.saveAsTable(multiRowgroupTable)

val deltaLog = DeltaLog.forTable(spark, TableIdentifier(multiRowgroupTable))
val files = deltaLog.update().allFiles.collect()

assert(files.length === 1)
assertParquetHasMultipleRowGroups(new Path(deltaLog.dataPath, files.head.path))

spark.conf.set(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key, "true")
}

Expand All @@ -918,36 +947,29 @@ class DeletionVectorsWithPredicatePushdownSuite extends DeletionVectorsSuite {
// code path in DeltaParquetFileFormat.
val codeGenMaxFields = if (readColumnarBatchAsRows) "0" else "100"
withSQLConf(
SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> codeGenMaxFields,
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReaderEnabled.toString) {
SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> codeGenMaxFields,
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReaderEnabled.toString,
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "2MB") {
sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` SHALLOW CLONE $multiRowgroupTable")

val targetTable = io.delta.tables.DeltaTable.forPath(dir.getCanonicalPath)
val deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath)
val files = deltaLog.update().allFiles.collect()
assert(files.length === 1)

// Execute multiple delete statements. These require to reconsile the metadata column
// between DV writing and scanning operations.
deletePredicates.foreach(targetTable.delete)

val targetTableDF = selectPredicate.map(targetTable.toDF.filter).getOrElse(targetTable.toDF)
assertPredicatesArePushedDown(targetTableDF)
// Make sure there are splits.
assert(targetTableDF.rdd.partitions.size > 1)

withTempDir { resultDir =>
// Write results to a table without DVs for validation. We are doing this to avoid
// loading the dataset into memory.
targetTableDF
.write
.option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, false.toString)
.format("delta")
.save(resultDir.getCanonicalPath)

val resultsTable = io.delta.tables.DeltaTable.forPath(resultDir.getCanonicalPath)

assert(resultsTable.toDF.count() === expectedNumRows)
// The delete/filtered rows should not exist.
assert(resultsTable.toDF.filter(validationPredicate).count() === 0)
}
// Make sure there are multiple row groups.
assertParquetHasMultipleRowGroups(new Path(files.head.path))
// Make sure we have 2 splits.
assert(targetTableDF.rdd.partitions.size === 2)

assert(targetTableDF.count() === expectedNumRows)
assert(targetTableDF.filter(validationPredicate).count() === 0)
}
}
}
Expand All @@ -974,11 +996,11 @@ class DeletionVectorsWithPredicatePushdownSuite extends DeletionVectorsSuite {
s"vectorizedReaderEnabled: $vectorizedReaderEnabled " +
s"readColumnarBatchAsRows: $readColumnarBatchAsRows") {
testPredicatePushDown(
deletePredicates = Seq("id == 40000000"),
deletePredicates = Seq("id == 900000"),
selectPredicate = None,
expectedNumRows = multiRowgroupTableRowsNum - 1,
// (rowId, Expected value).
validationPredicate = "id == 40000000",
validationPredicate = "id == 900000",
vectorizedReaderEnabled = vectorizedReaderEnabled,
readColumnarBatchAsRows = readColumnarBatchAsRows)
}
Expand All @@ -990,10 +1012,10 @@ class DeletionVectorsWithPredicatePushdownSuite extends DeletionVectorsSuite {
s"vectorizedReaderEnabled: $vectorizedReaderEnabled " +
s"readColumnarBatchAsRows: $readColumnarBatchAsRows") {
testPredicatePushDown(
deletePredicates = Seq("id in (200, 2000, 20000, 20000000, 40000000)"),
deletePredicates = Seq("id in (20, 200, 2000, 900000)"),
selectPredicate = None,
expectedNumRows = multiRowgroupTableRowsNum - 5,
validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)",
expectedNumRows = multiRowgroupTableRowsNum - 4,
validationPredicate = "id in (20, 200, 2000, 900000)",
vectorizedReaderEnabled = vectorizedReaderEnabled,
readColumnarBatchAsRows = readColumnarBatchAsRows)
}
Expand All @@ -1005,11 +1027,10 @@ class DeletionVectorsWithPredicatePushdownSuite extends DeletionVectorsSuite {
s"vectorizedReaderEnabled: $vectorizedReaderEnabled " +
s"readColumnarBatchAsRows: $readColumnarBatchAsRows") {
testPredicatePushDown(
deletePredicates =
Seq("id = 200", "id = 2000", "id = 20000", "id = 20000000", "id = 40000000"),
deletePredicates = Seq("id = 20", "id = 200", "id = 2000", "id = 900000"),
selectPredicate = None,
expectedNumRows = multiRowgroupTableRowsNum - 5,
validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)",
expectedNumRows = multiRowgroupTableRowsNum - 4,
validationPredicate = "id in (20, 200, 2000, 900000)",
vectorizedReaderEnabled = vectorizedReaderEnabled,
readColumnarBatchAsRows = readColumnarBatchAsRows)
}
Expand All @@ -1021,10 +1042,10 @@ class DeletionVectorsWithPredicatePushdownSuite extends DeletionVectorsSuite {
s"vectorizedReaderEnabled: $vectorizedReaderEnabled " +
s"readColumnarBatchAsRows: $readColumnarBatchAsRows") {
testPredicatePushDown(
deletePredicates = Seq("id = 200", "id = 2000", "id = 40000000"),
selectPredicate = Some("id not in (20000, 20000000)"),
expectedNumRows = multiRowgroupTableRowsNum - 5,
validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)",
deletePredicates = Seq("id = 20", "id = 2000"),
selectPredicate = Some("id not in (200, 900000)"),
expectedNumRows = multiRowgroupTableRowsNum - 4,
validationPredicate = "id in (20, 200, 2000, 900000)",
vectorizedReaderEnabled = vectorizedReaderEnabled,
readColumnarBatchAsRows = readColumnarBatchAsRows)
}
Expand All @@ -1037,9 +1058,9 @@ class DeletionVectorsWithPredicatePushdownSuite extends DeletionVectorsSuite {
s"readColumnarBatchAsRows: $readColumnarBatchAsRows") {
testPredicatePushDown(
deletePredicates = Seq.empty,
selectPredicate = Some("id not in (200, 2000, 20000, 20000000, 40000000)"),
expectedNumRows = multiRowgroupTableRowsNum - 5,
validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)",
selectPredicate = Some("id not in (20, 200, 2000, 900000)"),
expectedNumRows = multiRowgroupTableRowsNum - 4,
validationPredicate = "id in (20, 200, 2000, 900000)",
vectorizedReaderEnabled = vectorizedReaderEnabled,
readColumnarBatchAsRows = readColumnarBatchAsRows)
}
Expand All @@ -1050,7 +1071,7 @@ class DeletionVectorsWithPredicatePushdownSuite extends DeletionVectorsSuite {
sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` SHALLOW CLONE $multiRowgroupTable")

val targetTable = io.delta.tables.DeltaTable.forPath(dir.getCanonicalPath)
targetTable.delete("id == 40000000")
targetTable.delete("id == 900000")

val r1 = targetTable.toDF.select("id", "_metadata.row_index").count()
assert(r1 === multiRowgroupTableRowsNum - 1)
Expand Down

0 comments on commit d678bae

Please sign in to comment.