diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaFileProviderUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaFileProviderUtils.scala index cb14d3fb7f9..84d0f6d7e97 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaFileProviderUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaFileProviderUtils.scala @@ -58,10 +58,14 @@ object DeltaFileProviderUtils { deltaLog: DeltaLog, startVersion: Long, endVersion: Long): Seq[FileStatus] = { - val result = deltaLog - .listFrom(startVersion) - .collect { case DeltaFile(fs, v) if v <= endVersion => (fs, v) } - .toSeq + // Pass `failOnDataLoss = false` as we are doing an explicit validation on the result ourselves + // to identify that there are no gaps. + val result = + deltaLog + .getChangeLogFiles(startVersion, endVersion, failOnDataLoss = false) + .map(_._2) + .collect { case DeltaFile(fs, v) => (fs, v) } + .toSeq // Verify that we got the entire range requested if (result.size.toLong != endVersion - startVersion + 1) { throw DeltaErrors.deltaVersionsNotContiguousException(spark, result.map(_._2)) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 902c3f78942..93ede5a355c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -713,4 +713,28 @@ class DeltaLogSuite extends QueryTest } } } + + // This test needs to be extended to Managed Commits once DeltaLogSuite gets extended. + test("DeltaFileProviderUtils.getDeltaFilesInVersionRange") { + withTempDir { dir => + val path = dir.getCanonicalPath + spark.range(0, 1).write.format("delta").mode("overwrite").save(path) + spark.range(0, 1).write.format("delta").mode("overwrite").save(path) + spark.range(0, 1).write.format("delta").mode("overwrite").save(path) + spark.range(0, 1).write.format("delta").mode("overwrite").save(path) + val log = DeltaLog.forTable(spark, new Path(path)) + val result = DeltaFileProviderUtils.getDeltaFilesInVersionRange( + spark, log, startVersion = 1, endVersion = 3) + assert(result.map(FileNames.getFileVersion) === Seq(1, 2, 3)) + val filesAreUnbackfilledArray = result.map(FileNames.isUnbackfilledDeltaFile) + + val (fileV1, fileV2, fileV3) = (result(0), result(1), result(2)) + assert(FileNames.getFileVersion(fileV1) === 1) + assert(FileNames.getFileVersion(fileV2) === 2) + assert(FileNames.getFileVersion(fileV3) === 3) + + assert(filesAreUnbackfilledArray === Seq(false, false, false)) + } + } + }