From 35098ab05a5fdd77e4452704fea72d3211e6ade8 Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Tue, 14 May 2024 11:01:27 -0700 Subject: [PATCH] Parallel calls --- .../spark/sql/delta/MetadataCleanup.scala | 20 ++++- .../spark/sql/delta/DeltaRetentionSuite.scala | 88 ++++++++++++++++++- .../sql/delta/DeltaRetentionSuiteBase.scala | 54 +++++++++++- 3 files changed, 155 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala index 9de270c359f..349ce9109e2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, MINUTE, Trun import org.apache.spark.sql.delta.actions.{Action, Metadata} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.FileNames -import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, listingPrefix, CheckpointFile, DeltaFile} +import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, listingPrefix, CheckpointFile, DeltaFile, UnbackfilledDeltaFile} import org.apache.commons.lang3.time.DateUtils import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -84,6 +84,7 @@ trait MetadataCleanup extends DeltaLogging { logInfo(s"Compatibility checkpoint creation metrics: $v2CompatCheckpointMetrics") } var wasCheckpointDeleted = false + var maxBackfilledVersionDeleted = -1L expiredDeltaLogs.map(_.getPath).foreach { path => // recursive = false if (fs.delete(path, false)) { @@ -91,8 +92,22 @@ trait MetadataCleanup extends DeltaLogging { if (FileNames.isCheckpointFile(path)) { wasCheckpointDeleted = true } + if (FileNames.isDeltaFile(path)) { + maxBackfilledVersionDeleted = + Math.max(maxBackfilledVersionDeleted, FileNames.deltaVersion(path)) + } } } + val expiredUnbackfilledDeltaLogs = + store + .listFrom( + listingPrefix(FileNames.commitDirPath(logPath), 0), + snapshot.deltaLog.newDeltaHadoopConf()) + .takeWhile { case UnbackfilledDeltaFile(_, fileVersion, _) => + fileVersion <= maxBackfilledVersionDeleted + } + val numDeletedUnbackfilled = expiredUnbackfilledDeltaLogs.count( + log => fs.delete(log.getPath, false)) if (wasCheckpointDeleted) { // Trigger sidecar deletion only when some checkpoints have been deleted as part of this // round of Metadata cleanup. @@ -103,7 +118,8 @@ trait MetadataCleanup extends DeltaLogging { sidecarDeletionMetrics) logInfo(s"Sidecar deletion metrics: $sidecarDeletionMetrics") } - logInfo(s"Deleted $numDeleted log files older than $formattedDate") + logInfo(s"Deleted $numDeleted log files and $numDeletedUnbackfilled unbackfilled commit " + + s"files older than $formattedDate") } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala index b7821d25a92..f5b8e6c7c3a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala @@ -27,7 +27,8 @@ import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.FileNames -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RawLocalFileSystem} import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest @@ -42,7 +43,7 @@ class DeltaRetentionSuite extends QueryTest protected override def sparkConf: SparkConf = super.sparkConf override protected def getLogFiles(dir: File): Seq[File] = - getDeltaFiles(dir) ++ getCheckpointFiles(dir) + getDeltaFiles(dir) ++ getUnbackfilledDeltaFiles(dir) ++ getCheckpointFiles(dir) test("delete expired logs") { withTempDir { tempDir => @@ -505,3 +506,86 @@ class DeltaRetentionSuite extends QueryTest } } } + +class DeltaRetentionWithManagedCommitBatch1Suite extends DeltaRetentionSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(1) +} + +/** + * This test suite does not extend other tests of DeltaRetentionSuiteEdge because + * DeltaRetentionSuiteEdge contain tests that rely on setting the file modification time for delta + * files. However, in this suite, delta files might be backfilled asynchronously, which means + * setting the modification time will not work as expected. + */ +class DeltaRetentionWithManagedCommitBatch2Suite extends DeltaRetentionSuiteBase { + override def managedCommitBackfillBatchSize: Option[Int] = Some(2) + + override def getLogFiles(dir: File): Seq[File] = + getDeltaFiles(dir) ++ getUnbackfilledDeltaFiles(dir) ++ getCheckpointFiles(dir) + + /** + * This test verifies that unbackfilled versions, i.e., versions for which backfilled deltas do + * not exist yet, are never considered for deletion, even if they fall outside the retention + * window. The primary reason for not deleting these versions is that the CommitOwner might be + * actively tracking those files, and currently, MetadataCleanup does not communicate with the + * CommitOwner. + * + * Although the fact that they are unbackfilled is somewhat redundant since these versions are + * currently already protected due to two additional reasons: + * 1.They will always be part of the latest snapshot. + * 2.They don't have two checkpoints after them. + * However, this test helps ensure that unbackfilled deltas remain protected in the future, even + * if the above two conditions are no longer triggered. + * + * Note: This test is too slow for batchSize = 100 and wouldn't necessarily work for batchSize = 1 + */ + test("unbackfilled expired commits are always retained") { + withTempDir { tempDir => + val startTime = getStartTimeForRetentionTest + val clock = new ManualClock(startTime) + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val logPath = new File(log.logPath.toUri.getPath) + val fs = new RawLocalFileSystem() + fs.initialize(tempDir.toURI, new Configuration()) + + log.startTransaction().commitManually(createTestAddFile("1")) + log.checkpoint() + spark.sql(s"""ALTER TABLE tahoe.`${tempDir.toString}` + |SET TBLPROPERTIES( + |-- Trigger log clean up manually. + |'delta.enableExpiredLogCleanup' = 'false', + |'delta.checkpointInterval' = '10000', + |'delta.checkpointRetentionDuration' = 'interval 2 days', + |'delta.logRetentionDuration' = 'interval 30 days', + |'delta.enableFullRetentionRollback' = 'true') + """.stripMargin) + log.checkpoint() + setModificationTime(log, startTime, 0, 0, fs) + setModificationTime(log, startTime, 1, 0, fs) + // Create commits [2, 6] with a checkpoint per commit + 2 to 6 foreach { i => + log.startTransaction().commitManually(createTestAddFile(s"$i")) + log.checkpoint() + setModificationTime(log, startTime, i, 0, fs) + } + // Create unbackfilled commit [7] with no checkpoints + log.startTransaction().commitManually(createTestAddFile("7")) + setModificationTime(log, startTime, 7, 0, fs) + + // Everything is eligible for deletion but we don't consider the unbackfilled commit, + // i.e. [7], for deletion because it is part of the current LogSegment. Since we also need 2 + // checkpoints, [5, 6] are also protected. + clock.setTime(day(startTime, 100)) + log.cleanUpExpiredLogs(log.update()) + compareVersions( + getDeltaVersions(logPath), + "backfilled delta", + 5 to 6) + compareVersions( + getUnbackfilledDeltaVersions(logPath), + "unbackfilled delta", + 5 to 7) + } + } +} + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala index ea5c4b48509..284f533d12f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala @@ -24,12 +24,13 @@ import scala.collection.mutable import org.apache.spark.sql.delta.DeltaOperations.Truncate import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile import org.apache.spark.sql.delta.actions.{CheckpointMetadata, Metadata, SidecarFile} +import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.FileNames import org.apache.spark.sql.delta.util.FileNames.{newV2CheckpointJsonFile, newV2CheckpointParquetFile} import org.apache.commons.lang3.time.DateUtils -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest @@ -39,7 +40,8 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.ManualClock trait DeltaRetentionSuiteBase extends QueryTest - with SharedSparkSession { + with SharedSparkSession + with ManagedCommitBaseSuite { protected val testOp = Truncate() protected override def sparkConf: SparkConf = super.sparkConf @@ -88,9 +90,26 @@ trait DeltaRetentionSuiteBase extends QueryTest } protected def getDeltaVersions(dir: File): Set[Long] = { - getFileVersions(getDeltaFiles(dir)) + val backfilledDeltaVersions = getFileVersions(getDeltaFiles(dir)) + val unbackfilledDeltaVersions = getUnbackfilledDeltaVersions(dir) + if (managedCommitsEnabledInTests) { + // The unbackfilled commit files (except commit 0) should be a superset of the backfilled + // commit files since they're always deleted together in this suite. + assert( + unbackfilledDeltaVersions.toArray.sorted.startsWith( + backfilledDeltaVersions.filter(_ != 0).toArray.sorted)) + } + backfilledDeltaVersions + } + + protected def getUnbackfilledDeltaFiles(dir: File): Seq[File] = { + val commitDirPath = FileNames.commitDirPath(new Path(dir.toURI)) + getDeltaFiles(new File(commitDirPath.toUri)) } + protected def getUnbackfilledDeltaVersions(dir: File): Set[Long] = + getFileVersions(getUnbackfilledDeltaFiles(dir)) + protected def getSidecarFiles(log: DeltaLog): Set[String] = { new java.io.File(log.sidecarDirPath.toUri) .listFiles() @@ -135,6 +154,35 @@ trait DeltaRetentionSuiteBase extends QueryTest } } + protected def setModificationTime( + log: DeltaLog, + startTime: Long, + version: Int, + dayNum: Int, + fs: FileSystem, + checkpointOnly: Boolean = false): Unit = { + val logDir = log.logPath.toUri.toString + val paths = log + .listFrom(version) + .collect { case FileNames.CheckpointFile(f, v) if v == version => f.getPath } + .toSeq + paths.foreach { cpPath => + // Add some second offset so that we don't have files with same timestamps + fs.setTimes(cpPath, day(startTime, dayNum) + version * 1000, 0) + } + if (!checkpointOnly) { + val deltaPath = new Path(logDir + f"/$version%020d.json") + if (fs.exists(deltaPath)) { + // Add some second offset so that we don't have files with same timestamps + fs.setTimes(deltaPath, day(startTime, dayNum) + version * 1000, 0) + } + // Add the same timestamp for unbackfilled delta files as well + fs.listStatus(FileNames.commitDirPath(log.logPath)) + .find(_.getPath.getName.startsWith(f"$version%020d")) + .foreach(f => fs.setTimes(f.getPath, day(startTime, dayNum) + version * 1000, 0)) + } + } + protected def day(startTime: Long, day: Int): Long = startTime + intervalStringToMillis(s"interval $day days")