From 3883545abaa3916d76d8b34665231f53cf21478e Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Fri, 3 May 2024 11:13:17 -0700 Subject: [PATCH] Parallel calls --- .../spark/sql/delta/SnapshotManagement.scala | 12 +- .../spark/sql/delta/CheckpointsSuite.scala | 10 +- .../sql/delta/DeltaHistoryManagerSuite.scala | 10 +- .../delta/DeltaLogMinorCompactionSuite.scala | 8 +- .../sql/delta/DeltaTimeTravelSuite.scala | 2 +- .../sql/delta/DescribeDeltaDetailSuite.scala | 12 +- .../sql/delta/SnapshotManagementSuite.scala | 131 ++++++++++++------ .../ManagedCommitTestUtils.scala | 13 +- 8 files changed, 136 insertions(+), 62 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 5eb347e0b11..e94b483fe13 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -459,10 +459,14 @@ trait SnapshotManagement { self: DeltaLog => .collect { case cp if cp.version == ci.version => cp } .getOrElse(ci.getCheckpointProvider(this, checkpoints, lastCheckpointInfo)) } - // In the case where `deltasAfterCheckpoint` is empty, `deltas` should still not be empty, - // they may just be before the checkpoint version unless we have a bug in log cleanup. - if (deltas.isEmpty) { - throw new IllegalStateException(s"Could not find any delta files for version $newVersion") + // If there's a valid checkpoint, `deltas` should contain the checkpoint version unless we + // have a bug in log cleanup. + newCheckpoint.map(_.version).foreach { cpVersion => + deltas.filterNot(isUnbackfilledDeltaFile).find(deltaVersion(_) == cpVersion).orElse( + throw new IllegalStateException( + s"Could not find any backfilled delta files for version $cpVersion even though there " + + "is a checkpoint at this version.") + ) } if (versionToLoad.exists(_ != newVersion)) { throwNonExistentVersionError(versionToLoad.get) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 93d1fc443bd..76179b5cdbc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -581,7 +581,7 @@ class CheckpointsSuite // Delete the commit files 0-9, so that we are forced to read the checkpoint file val logPath = new Path(new File(target, "_delta_log").getAbsolutePath) - for (i <- 0 to 10) { + for (i <- 0 to 9) { val file = new File(FileNames.unsafeDeltaFile(logPath, version = i).toString) file.delete() } @@ -1062,15 +1062,15 @@ class FakeGCSFileSystemValidatingCommits extends FakeGCSFileSystemValidatingChec override protected def shouldValidateFilePattern(f: Path): Boolean = f.getName.contains(".json") } -class ManagedCommitBatch1BackFillCheckpointsSuite extends CheckpointsSuite { +class CheckpointsWithManagedCommitBatch1Suite extends CheckpointsSuite { override val managedCommitBackfillBatchSize: Option[Int] = Some(1) } -class ManagedCommitBatch2BackFillCheckpointsSuite extends CheckpointsSuite { +class CheckpointsWithManagedCommitBatch2Suite extends CheckpointsSuite { override val managedCommitBackfillBatchSize: Option[Int] = Some(2) } -class ManagedCommitBatch20BackFillCheckpointsSuite extends CheckpointsSuite { - override val managedCommitBackfillBatchSize: Option[Int] = Some(20) +class CheckpointsWithManagedCommitBatch100Suite extends CheckpointsSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(100) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala index 4c218531b92..b45b8eb0a72 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala @@ -664,6 +664,14 @@ class DeltaHistoryManagerSuite extends DeltaHistoryManagerBase { } } -class ManagedCommitFill1DeltaHistoryManagerSuite extends DeltaHistoryManagerSuite { +class DeltaHistoryManagerWithManagedCommitBatch1Suite extends DeltaHistoryManagerSuite { override def managedCommitBackfillBatchSize: Option[Int] = Some(1) } + +class DeltaHistoryManagerWithManagedCommitBatch2Suite extends DeltaHistoryManagerSuite { + override def managedCommitBackfillBatchSize: Option[Int] = Some(2) +} + +class DeltaHistoryManagerWithManagedCommitBatch100Suite extends DeltaHistoryManagerSuite { + override def managedCommitBackfillBatchSize: Option[Int] = Some(100) +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala index 0a028a59af6..e66b86af84d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala @@ -440,15 +440,15 @@ class DeltaLogMinorCompactionSuite extends QueryTest } } -class ManagedCommitBatchBackfill1DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite { +class DeltaLogMinorCompactionWithManagedCommitBatch1Suite extends DeltaLogMinorCompactionSuite { override val managedCommitBackfillBatchSize: Option[Int] = Some(1) } -class ManagedCommitBatchBackFill2DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite { +class DeltaLogMinorCompactionWithManagedCommitBatch2Suite extends DeltaLogMinorCompactionSuite { override val managedCommitBackfillBatchSize: Option[Int] = Some(2) } -class ManagedCommitBatchBackFill20DeltaLogMinorCompactionSuite +class DeltaLogMinorCompactionWithManagedCommitBatch100Suite extends DeltaLogMinorCompactionSuite { - override val managedCommitBackfillBatchSize: Option[Int] = Some(20) + override val managedCommitBackfillBatchSize: Option[Int] = Some(100) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala index 69f90af8153..aec01fb1a9d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala @@ -795,6 +795,6 @@ class DeltaTimeTravelSuite extends QueryTest } } -class ManagedCommitFill1DeltaTimeTravelSuite extends DeltaTimeTravelSuite { +class DeltaTimeTravelWithManagedCommitBatch1Suite extends DeltaTimeTravelSuite { override def managedCommitBackfillBatchSize: Option[Int] = Some(1) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala index bfab95d8d31..5d307cd928b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala @@ -21,6 +21,7 @@ import java.io.FileNotFoundException // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION} +import org.apache.spark.sql.delta.managedcommit.ManagedCommitTestUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -33,6 +34,7 @@ import org.apache.spark.util.Utils trait DescribeDeltaDetailSuiteBase extends QueryTest with SharedSparkSession + with ManagedCommitTestUtils with DeltaTestUtilsForTempViews { import testImplicits._ @@ -211,7 +213,8 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest } } - test("delta table: describe detail always run on the latest snapshot") { + testWithDifferentBackfillIntervalOptional( + "delta table: describe detail always run on the latest snapshot") { batchSizeOpt => val tableName = "tbl_name_on_latest_snapshot" withTable(tableName) { val tempDir = Utils.createTempDir().toString @@ -229,8 +232,13 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest metadata.configuration ++ Map("foo" -> "bar") ) txn.commit(newMetadata :: Nil, DeltaOperations.ManualUpdate) + val managedCommitProperties = batchSizeOpt.map(_ => + Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> "tracking-in-memory", + DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.key -> "{\"randomConf\":\"randomConfValue\"}", + DeltaConfigs.MANAGED_COMMIT_TABLE_CONF.key -> "{}")) + .getOrElse(Map.empty) checkResult(sql(s"DESCRIBE DETAIL $tableName"), - Seq(Map("foo" -> "bar")), + Seq(Map("foo" -> "bar") ++ managedCommitProperties), Seq("properties") ) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index 2aef02989a2..d675a9d356a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.delta.storage.LogStore.logStoreClassConfKey import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames, JsonUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.Path @@ -45,7 +45,7 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.storage.StorageLevel class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with SharedSparkSession - with DeltaSQLCommandTest { + with DeltaSQLCommandTest with ManagedCommitBaseSuite { /** @@ -235,45 +235,56 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar makeCorruptCheckpointFile(path, checkpointVersion = 1, shouldBeEmpty = testEmptyCheckpoint, multipart = multipart) - // The code paths are different, but the error and message end up being the same: - // - // testEmptyCheckpoint = true: - // - checkpoint 1 is NOT in the list result. - // - fallback to load version 0 using checkpoint 0 - // - fail to read checkpoint 0 - // - cannot find log file 0 so throw the above checkpoint 0 read failure - // - // testEmptyCheckpoint = false: - // - checkpoint 1 is in the list result. - // - Snapshot creation triggers state reconstruction - // - fail to read protocol+metadata from checkpoint 1 - // - fallback to load version 0 using checkpoint 0 - // - fail to read checkpoint 0 - // - cannot find log file 0 so throw the original checkpoint 1 read failure - val e = intercept[SparkException] { staleLog.update() } - val version = if (testEmptyCheckpoint) 0 else 1 - assert(e.getMessage.contains(f"$version%020d.checkpoint") && - e.getMessage.contains(SHOULD_NOT_RECOVER_CHECKPOINT_ERROR_MSG)) + val e = intercept[Exception] { staleLog.update() } + if (testEmptyCheckpoint) { + // - checkpoint 1 is NOT in the list result. + // - fallback to load version 0 using checkpoint 0 + // - cannot find log file 0 so throw the log file not found failure. + assert( + e.isInstanceOf[IllegalStateException] && + e.getMessage.contains("Could not find any backfilled delta files for version 0 " + + "even though there is a checkpoint at this version.")) + } else { + // - checkpoint 1 is in the list result. + // - Snapshot creation triggers state reconstruction + // - fail to read protocol+metadata from checkpoint 1 + // - fallback to load version 0 using checkpoint 0 + // - cannot find log file 0 so throw the original checkpoint 1 read failure + assert(e.isInstanceOf[SparkException] && e.getMessage.contains("0001.checkpoint") && + e.getMessage.contains(SHOULD_NOT_RECOVER_CHECKPOINT_ERROR_MSG)) + } } } } - test("should throw a clear exception when checkpoint exists but its corresponding delta file " + - "doesn't exist") { - withTempDir { tempDir => - val path = tempDir.getCanonicalPath - val staleLog = DeltaLog.forTable(spark, path) - DeltaLog.clearCache() + BOOLEAN_DOMAIN.foreach { deleteUnbackfilledDeltas => + test( + "should throw a clear exception when checkpoint exists but its corresponding delta file " + + s"doesn't exist, deleteUnbackfilledDeltas: $deleteUnbackfilledDeltas") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + val staleLog = DeltaLog.forTable(spark, path) + DeltaLog.clearCache() - spark.range(10).write.format("delta").save(path) - DeltaLog.forTable(spark, path).checkpoint() - // Delete delta files - new File(tempDir, "_delta_log").listFiles().filter(_.getName.endsWith(".json")) - .foreach(_.delete()) - val e = intercept[IllegalStateException] { - staleLog.update() + spark.range(10).write.format("delta").save(path) + spark.range(10).write.format("delta").mode("append").save(path) + DeltaLog.forTable(spark, path).checkpoint() + // Delete delta file at version 1 + new File(tempDir, "_delta_log") + .listFiles() + .filter(_.getName.endsWith("1.json")) + .foreach(_.delete()) + if (deleteUnbackfilledDeltas) { + new File(new File(tempDir, "_delta_log"), "_commits") + .listFiles() + .filter(_.getName.endsWith("1.json")) + .foreach(_.delete()) + } + val e = intercept[IllegalStateException] { + staleLog.update() + } + assert(e.getMessage.contains("Could not find any backfilled delta files for version 1")) } - assert(e.getMessage.contains("Could not find any delta files for version 0")) } } @@ -304,11 +315,17 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar // Delete delta files new File(tempDir, "_delta_log").listFiles().filter(_.getName.endsWith(".json")) .foreach(_.delete()) + if (managedCommitsEnabledInTests) { + new File(new File(tempDir, "_delta_log"), "_commits") + .listFiles() + .filter(_.getName.endsWith(".json")) + .foreach(_.delete()) + } makeCorruptCheckpointFile(path, checkpointVersion = 0, shouldBeEmpty = false) val e = intercept[IllegalStateException] { staleLog.update() } - assert(e.getMessage.contains("Could not find any delta files for version 0")) + assert(e.getMessage.contains("Could not find any backfilled delta files for version 0")) } } @@ -459,10 +476,34 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar val oldLogSegment = log.snapshot.logSegment spark.range(10).write.format("delta").save(path) val newLogSegment = log.snapshot.logSegment - assert(log.getLogSegmentAfterCommit(None, oldLogSegment.checkpointProvider) === newLogSegment) + assert(log.getLogSegmentAfterCommit( + log.snapshot.tableCommitOwnerClientOpt, + oldLogSegment.checkpointProvider) === newLogSegment) spark.range(10).write.format("delta").mode("append").save(path) - assert(log.getLogSegmentAfterCommit(None, oldLogSegment.checkpointProvider) - === log.snapshot.logSegment) + val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf()) + val commitFileProvider = DeltaCommitFileProvider(log.snapshot) + intercept[IllegalArgumentException] { + val commitFile = fs.getFileStatus(commitFileProvider.deltaFile(1)) + val commit = Commit( + version = 1, + fileStatus = commitFile, + commitTimestamp = 0) + // Version exists, but not contiguous with old logSegment + log.getLogSegmentAfterCommit(1, None, oldLogSegment, commit, None, EmptyCheckpointProvider) + } + intercept[IllegalArgumentException] { + val commitFile = fs.getFileStatus(commitFileProvider.deltaFile(0)) + val commit = Commit( + version = 0, + fileStatus = commitFile, + commitTimestamp = 0) + + // Version exists, but newLogSegment already contains it + log.getLogSegmentAfterCommit(0, None, newLogSegment, commit, None, EmptyCheckpointProvider) + } + assert(log.getLogSegmentAfterCommit( + log.snapshot.tableCommitOwnerClientOpt, + oldLogSegment.checkpointProvider) === log.snapshot.logSegment) } } @@ -488,6 +529,18 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar } } +class SnapshotManagementWithManagedCommitBatch1Suite extends SnapshotManagementSuite { + override def managedCommitBackfillBatchSize: Option[Int] = Some(1) +} + +class SnapshotManagementWithManagedCommitBatch2Suite extends SnapshotManagementSuite { + override def managedCommitBackfillBatchSize: Option[Int] = Some(2) +} + +class SnapshotManagementWithManagedCommitBatch100Suite extends SnapshotManagementSuite { + override def managedCommitBackfillBatchSize: Option[Int] = Some(100) +} + class CountDownLatchLogStore(sparkConf: SparkConf, hadoopConf: Configuration) extends LocalLogStore(sparkConf, hadoopConf) { override def listFrom(path: Path, hadoopConf: Configuration): Iterator[FileStatus] = { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala index d433a851b55..74ab51adb6b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala @@ -77,12 +77,13 @@ trait ManagedCommitTestUtils test(s"$testName [Backfill batch size: None]") { f(None) } - val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue") - val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf) - withSQLConf( - DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> "in-memory", - DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey -> managedCommitOwnerJson) { - testWithDifferentBackfillInterval(testName) { backfillBatchSize => + testWithDifferentBackfillInterval(testName) { backfillBatchSize => + val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue") + val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf) + withSQLConf( + DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> "tracking-in-memory", + DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey -> + managedCommitOwnerJson) { f(Some(backfillBatchSize)) } }