Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Jan 17, 2024
1 parent a52e25b commit 814cf7c
Showing 1 changed file with 46 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,52 +151,6 @@ class MergeIntoDVsSuite extends MergeIntoDVsTests {
numDeletionVectorsRemoved = 1,
numDeletionVectorsUpdated = 0)
}

test(s"Verify error is produced when paths are not joined correctly") {
withTempDir { dir =>
val sourcePath = s"$dir/source"
val targetPath = s"$dir/target"

spark.range(0, 10, 2).write.format("delta").save(sourcePath)
spark.range(10).write.format("delta").save(targetPath)

// Execute buildRowIndexSetsForFilesMatchingCondition with a corrupted touched files list.
val sourceDF = io.delta.tables.DeltaTable.forPath(sourcePath).toDF
val targetDF = io.delta.tables.DeltaTable.forPath(targetPath).toDF
val targetLog = DeltaLog.forTable(spark, targetPath)
val condition = col("s.id") === col("t.id")
val allFiles = targetLog.update().allFiles.collect().toSeq
assert(allFiles.size === 2)
val corruptedFiles = Seq(
allFiles.head,
allFiles.last.copy(path = "corruptedPath"))
val txn = targetLog.startTransaction(catalogTableOpt = None)

val fileIndex = new TahoeBatchFileIndex(
spark,
actionType = "merge",
addFiles = allFiles,
deltaLog = targetLog,
path = new Path("file:" + targetPath),
snapshot = txn.snapshot)

val targetDFWithMetadata = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
spark,
targetDF.queryExecution.logical,
fileIndex)
val e = intercept[SparkException] {
DeletionVectorBitmapGenerator.buildRowIndexSetsForFilesMatchingCondition(
spark,
txn,
tableHasDVs = true,
targetDf = sourceDF.as("s").join(targetDFWithMetadata.as("t"), condition),
candidateFiles = corruptedFiles,
condition = condition.expr
)
}
assert(e.getCause.getMessage.contains("Encountered a non matched file path."))
}
}
}

test(s"Merge with DVs metrics - delete entire file") {
Expand Down Expand Up @@ -226,6 +180,52 @@ class MergeIntoDVsSuite extends MergeIntoDVsTests {
numDeletionVectorsUpdated = 0)
}
}

test(s"Verify error is produced when paths are not joined correctly") {
withTempDir { dir =>
val sourcePath = s"$dir/source"
val targetPath = s"$dir/target"

spark.range(0, 10, 2).write.format("delta").save(sourcePath)
spark.range(10).write.format("delta").save(targetPath)

// Execute buildRowIndexSetsForFilesMatchingCondition with a corrupted touched files list.
val sourceDF = io.delta.tables.DeltaTable.forPath(sourcePath).toDF
val targetDF = io.delta.tables.DeltaTable.forPath(targetPath).toDF
val targetLog = DeltaLog.forTable(spark, targetPath)
val condition = col("s.id") === col("t.id")
val allFiles = targetLog.update().allFiles.collect().toSeq
assert(allFiles.size === 2)
val corruptedFiles = Seq(
allFiles.head,
allFiles.last.copy(path = "corruptedPath"))
val txn = targetLog.startTransaction(catalogTableOpt = None)

val fileIndex = new TahoeBatchFileIndex(
spark,
actionType = "merge",
addFiles = allFiles,
deltaLog = targetLog,
path = targetLog.dataPath,
snapshot = txn.snapshot)

val targetDFWithMetadata = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
spark,
targetDF.queryExecution.logical,
fileIndex)
val e = intercept[SparkException] {
DeletionVectorBitmapGenerator.buildRowIndexSetsForFilesMatchingCondition(
spark,
txn,
tableHasDVs = true,
targetDf = sourceDF.as("s").join(targetDFWithMetadata.as("t"), condition),
candidateFiles = corruptedFiles,
condition = condition.expr
)
}
assert(e.getCause.getMessage.contains("Encountered a non matched file path."))
}
}
}

trait MergeCDCWithDVsTests extends MergeCDCTests with DeletionVectorsTestUtils {
Expand Down

0 comments on commit 814cf7c

Please sign in to comment.