From 814cf7c7e408e45262d6ad8de03259e86b9a4446 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Wed, 17 Jan 2024 10:03:05 +0100 Subject: [PATCH] fixes --- .../spark/sql/delta/MergeIntoDVsSuite.scala | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoDVsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoDVsSuite.scala index bcd38d9942f..cb8c5d34dbb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoDVsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoDVsSuite.scala @@ -151,52 +151,6 @@ class MergeIntoDVsSuite extends MergeIntoDVsTests { numDeletionVectorsRemoved = 1, numDeletionVectorsUpdated = 0) } - - test(s"Verify error is produced when paths are not joined correctly") { - withTempDir { dir => - val sourcePath = s"$dir/source" - val targetPath = s"$dir/target" - - spark.range(0, 10, 2).write.format("delta").save(sourcePath) - spark.range(10).write.format("delta").save(targetPath) - - // Execute buildRowIndexSetsForFilesMatchingCondition with a corrupted touched files list. - val sourceDF = io.delta.tables.DeltaTable.forPath(sourcePath).toDF - val targetDF = io.delta.tables.DeltaTable.forPath(targetPath).toDF - val targetLog = DeltaLog.forTable(spark, targetPath) - val condition = col("s.id") === col("t.id") - val allFiles = targetLog.update().allFiles.collect().toSeq - assert(allFiles.size === 2) - val corruptedFiles = Seq( - allFiles.head, - allFiles.last.copy(path = "corruptedPath")) - val txn = targetLog.startTransaction(catalogTableOpt = None) - - val fileIndex = new TahoeBatchFileIndex( - spark, - actionType = "merge", - addFiles = allFiles, - deltaLog = targetLog, - path = new Path("file:" + targetPath), - snapshot = txn.snapshot) - - val targetDFWithMetadata = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches( - spark, - targetDF.queryExecution.logical, - fileIndex) - val e = intercept[SparkException] { - DeletionVectorBitmapGenerator.buildRowIndexSetsForFilesMatchingCondition( - spark, - txn, - tableHasDVs = true, - targetDf = sourceDF.as("s").join(targetDFWithMetadata.as("t"), condition), - candidateFiles = corruptedFiles, - condition = condition.expr - ) - } - assert(e.getCause.getMessage.contains("Encountered a non matched file path.")) - } - } } test(s"Merge with DVs metrics - delete entire file") { @@ -226,6 +180,52 @@ class MergeIntoDVsSuite extends MergeIntoDVsTests { numDeletionVectorsUpdated = 0) } } + + test(s"Verify error is produced when paths are not joined correctly") { + withTempDir { dir => + val sourcePath = s"$dir/source" + val targetPath = s"$dir/target" + + spark.range(0, 10, 2).write.format("delta").save(sourcePath) + spark.range(10).write.format("delta").save(targetPath) + + // Execute buildRowIndexSetsForFilesMatchingCondition with a corrupted touched files list. + val sourceDF = io.delta.tables.DeltaTable.forPath(sourcePath).toDF + val targetDF = io.delta.tables.DeltaTable.forPath(targetPath).toDF + val targetLog = DeltaLog.forTable(spark, targetPath) + val condition = col("s.id") === col("t.id") + val allFiles = targetLog.update().allFiles.collect().toSeq + assert(allFiles.size === 2) + val corruptedFiles = Seq( + allFiles.head, + allFiles.last.copy(path = "corruptedPath")) + val txn = targetLog.startTransaction(catalogTableOpt = None) + + val fileIndex = new TahoeBatchFileIndex( + spark, + actionType = "merge", + addFiles = allFiles, + deltaLog = targetLog, + path = targetLog.dataPath, + snapshot = txn.snapshot) + + val targetDFWithMetadata = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches( + spark, + targetDF.queryExecution.logical, + fileIndex) + val e = intercept[SparkException] { + DeletionVectorBitmapGenerator.buildRowIndexSetsForFilesMatchingCondition( + spark, + txn, + tableHasDVs = true, + targetDf = sourceDF.as("s").join(targetDFWithMetadata.as("t"), condition), + candidateFiles = corruptedFiles, + condition = condition.expr + ) + } + assert(e.getCause.getMessage.contains("Encountered a non matched file path.")) + } + } } trait MergeCDCWithDVsTests extends MergeCDCTests with DeletionVectorsTestUtils {