Skip to content

Commit

Permalink
[SPARK] Add more testing for variant + delta features (delta-io#3102)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

adds testing for auto compaction and deletion vectors

## How was this patch tested?

test only change

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
richardc-db authored May 20, 2024
1 parent 7b4ee63 commit 25a42df
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File

// scalastyle:off import.ordering.noEmptyLine
import com.databricks.spark.util.{Log4jUsageLogger, UsageRecord}
import org.apache.spark.sql.delta.DeltaExcludedBySparkVersionTestMixinShims
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.commands.optimize._
import org.apache.spark.sql.delta.hooks.{AutoCompact, AutoCompactType}
Expand All @@ -32,7 +33,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.Column
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -60,7 +61,8 @@ class AutoCompactSuite extends
CompactionTestHelperForAutoCompaction
with DeltaSQLCommandTest
with SharedSparkSession
with AutoCompactTestUtils {
with AutoCompactTestUtils
with DeltaExcludedBySparkVersionTestMixinShims {

test("auto-compact-type: test table properties") {
withTempDir { tempDir =>
Expand Down Expand Up @@ -184,15 +186,20 @@ class AutoCompactSuite extends
}
}

private def checkAutoCompactionWorks(dir: String): Unit = {
spark.range(10).write.format("delta").mode("append").save(dir)
/**
* Writes `df` twice to the same location and checks that
* 1. There is only one resultant file.
* 2. The result is equal to `df` unioned with itself.
*/
private def checkAutoCompactionWorks(dir: String, df: DataFrame): Unit = {
df.write.format("delta").mode("append").save(dir)
val deltaLog = DeltaLog.forTable(spark, dir)
val newSnapshot = deltaLog.update()
assert(newSnapshot.version === 1) // 0 is the first commit, 1 is optimize
assert(deltaLog.update().numOfFiles === 1)

val isLogged = checkAutoOptimizeLogging {
spark.range(10).write.format("delta").mode("append").save(dir)
df.write.format("delta").mode("append").save(dir)
}

assert(isLogged)
Expand All @@ -202,17 +209,40 @@ class AutoCompactSuite extends

assert(deltaLog.update().numOfFiles === 1, "Files should be optimized into a single one")
checkAnswer(
spark.range(10).union(spark.range(10)).toDF(),
df.union(df).toDF(),
spark.read.format("delta").load(dir)
)
}

testBothModesViaProperty("auto compact should kick in when enabled - table config") { dir =>
checkAutoCompactionWorks(dir)
checkAutoCompactionWorks(dir, spark.range(10).toDF("id"))
}

testBothModesViaConf("auto compact should kick in when enabled - session config") { dir =>
checkAutoCompactionWorks(dir)
checkAutoCompactionWorks(dir, spark.range(10).toDF("id"))
}

testSparkMasterOnly("variant auto compact kicks in when enabled - table config") {
withTempDir { dir =>
withSQLConf(
"spark.databricks.delta.properties.defaults.autoOptimize.autoCompact" -> "true",
DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "0",
DeltaSQLConf.DELTA_AUTO_COMPACT_MODIFIED_PARTITIONS_ONLY_ENABLED.key -> "false") {
checkAutoCompactionWorks(
dir.getCanonicalPath, spark.range(10).selectExpr("parse_json(cast(id as string)) as v"))
}
}
}

testSparkMasterOnly("variant auto compact kicks in when enabled - session config") {
withTempDir { dir =>
withSQLConf(
DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED.key -> "true",
DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "0") {
checkAutoCompactionWorks(
dir.getCanonicalPath, spark.range(10).selectExpr("parse_json(cast(id as string)) as v"))
}
}
}

testBothModesViaProperty("auto compact should not kick in when session config is off") { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.deletionvectors
import java.io.{File, FileNotFoundException}
import java.net.URISyntaxException

import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeletionVectorsTestUtils, DeltaChecksumException, DeltaConfigs, DeltaLog, DeltaMetricsUtils, DeltaTestUtilsForTempViews}
import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeletionVectorsTestUtils, DeltaChecksumException, DeltaConfigs, DeltaExcludedBySparkVersionTestMixinShims, DeltaLog, DeltaMetricsUtils, DeltaTestUtilsForTempViews}
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.actions.{AddFile, DeletionVectorDescriptor, RemoveFile}
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor.EMPTY
Expand All @@ -46,7 +46,8 @@ class DeletionVectorsSuite extends QueryTest
with DeltaSQLCommandTest
with DeletionVectorsTestUtils
with DeltaTestUtilsForTempViews
with DeltaExceptionTestUtils {
with DeltaExceptionTestUtils
with DeltaExcludedBySparkVersionTestMixinShims {
import testImplicits._

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -270,7 +271,7 @@ class DeletionVectorsSuite extends QueryTest
}
}

Seq("name", "id").foreach(mode =>
Seq("name", "id").foreach { mode =>
test(s"DELETE with DVs with column mapping mode=$mode") {
withSQLConf("spark.databricks.delta.properties.defaults.columnMapping.mode" -> mode) {
withTempDir { dirName =>
Expand All @@ -286,7 +287,26 @@ class DeletionVectorsSuite extends QueryTest
}
}
}
)

testSparkMasterOnly(s"variant types DELETE with DVs with column mapping mode=$mode") {
withSQLConf("spark.databricks.delta.properties.defaults.columnMapping.mode" -> mode) {
withTempDir { dirName =>
val path = dirName.getAbsolutePath
val df = spark.range(0, 50).selectExpr(
"id % 10 as part",
"id",
"parse_json(cast(id as string)) as v"
)
df.write.format("delta").partitionBy("part").save(path)
val tableLog = DeltaLog.forTable(spark, path)
enableDeletionVectorsInTable(tableLog, true)
spark.sql(s"DELETE FROM delta.`$path` WHERE v::int = 2")
checkAnswer(spark.sql(s"select * from delta.`$path` WHERE v::int = 2"), Seq())
verifyDVsExist(tableLog, 1)
}
}
}
}

test("DELETE with DVs - existing table already has DVs") {
withSQLConf(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> "true") {
Expand Down

0 comments on commit 25a42df

Please sign in to comment.