Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed May 16, 2024
1 parent 8a8e757 commit e7d0b31
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, MINUTE, Trun
import org.apache.spark.sql.delta.actions.{Action, Metadata}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, listingPrefix, CheckpointFile, DeltaFile}
import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, listingPrefix, CheckpointFile, DeltaFile, UnbackfilledDeltaFile}
import org.apache.commons.lang3.time.DateUtils
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

Expand Down Expand Up @@ -84,15 +84,30 @@ trait MetadataCleanup extends DeltaLogging {
logInfo(s"Compatibility checkpoint creation metrics: $v2CompatCheckpointMetrics")
}
var wasCheckpointDeleted = false
var maxBackfilledVersionDeleted = -1L
expiredDeltaLogs.map(_.getPath).foreach { path =>
// recursive = false
if (fs.delete(path, false)) {
numDeleted += 1
if (FileNames.isCheckpointFile(path)) {
wasCheckpointDeleted = true
}
if (FileNames.isDeltaFile(path)) {
maxBackfilledVersionDeleted =
Math.max(maxBackfilledVersionDeleted, FileNames.deltaVersion(path))
}
}
}
val expiredUnbackfilledDeltaLogs =
store
.listFrom(
listingPrefix(FileNames.commitDirPath(logPath), 0),
snapshot.deltaLog.newDeltaHadoopConf())
.takeWhile { case UnbackfilledDeltaFile(_, fileVersion, _) =>
fileVersion <= maxBackfilledVersionDeleted
}
val numDeletedUnbackfilled = expiredUnbackfilledDeltaLogs.count(
log => fs.delete(log.getPath, false))
if (wasCheckpointDeleted) {
// Trigger sidecar deletion only when some checkpoints have been deleted as part of this
// round of Metadata cleanup.
Expand All @@ -103,7 +118,8 @@ trait MetadataCleanup extends DeltaLogging {
sidecarDeletionMetrics)
logInfo(s"Sidecar deletion metrics: $sidecarDeletionMetrics")
}
logInfo(s"Deleted $numDeleted log files older than $formattedDate")
logInfo(s"Deleted $numDeleted log files and $numDeletedUnbackfilled unbackfilled commit " +
s"files older than $formattedDate")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RawLocalFileSystem}

import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
Expand All @@ -42,7 +43,7 @@ class DeltaRetentionSuite extends QueryTest
protected override def sparkConf: SparkConf = super.sparkConf

override protected def getLogFiles(dir: File): Seq[File] =
getDeltaFiles(dir) ++ getCheckpointFiles(dir)
getDeltaFiles(dir) ++ getUnbackfilledDeltaFiles(dir) ++ getCheckpointFiles(dir)

test("delete expired logs") {
withTempDir { tempDir =>
Expand Down Expand Up @@ -505,3 +506,89 @@ class DeltaRetentionSuite extends QueryTest
}
}
}

class DeltaRetentionWithManagedCommitBatch1Suite extends DeltaRetentionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

/**
* This test suite does not extend other tests of DeltaRetentionSuiteEdge because
* DeltaRetentionSuiteEdge contain tests that rely on setting the file modification time for delta
* files. However, in this suite, delta files might be backfilled asynchronously, which means
* setting the modification time will not work as expected.
*/
class DeltaRetentionWithManagedCommitBatch2Suite extends QueryTest
with DeltaSQLCommandTest
with DeltaRetentionSuiteBase {
override def managedCommitBackfillBatchSize: Option[Int] = Some(2)

override def getLogFiles(dir: File): Seq[File] =
getDeltaFiles(dir) ++ getUnbackfilledDeltaFiles(dir) ++ getCheckpointFiles(dir)

/**
* This test verifies that unbackfilled versions, i.e., versions for which backfilled deltas do
* not exist yet, are never considered for deletion, even if they fall outside the retention
* window. The primary reason for not deleting these versions is that the CommitOwner might be
* actively tracking those files, and currently, MetadataCleanup does not communicate with the
* CommitOwner.
*
* Although the fact that they are unbackfilled is somewhat redundant since these versions are
* currently already protected due to two additional reasons:
* 1.They will always be part of the latest snapshot.
* 2.They don't have two checkpoints after them.
* However, this test helps ensure that unbackfilled deltas remain protected in the future, even
* if the above two conditions are no longer triggered.
*
* Note: This test is too slow for batchSize = 100 and wouldn't necessarily work for batchSize = 1
*/
test("unbackfilled expired commits are always retained") {
withTempDir { tempDir =>
val startTime = getStartTimeForRetentionTest
val clock = new ManualClock(startTime)
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
val logPath = new File(log.logPath.toUri.getPath)
val fs = new RawLocalFileSystem()
fs.initialize(tempDir.toURI, new Configuration())

log.startTransaction().commitManually(createTestAddFile("1"))
log.checkpoint()
spark.sql(s"""ALTER TABLE delta.`${tempDir.toString}`
|SET TBLPROPERTIES(
|-- Trigger log clean up manually.
|'delta.enableExpiredLogCleanup' = 'false',
|'delta.checkpointInterval' = '10000',
|'delta.checkpointRetentionDuration' = 'interval 2 days',
|'delta.logRetentionDuration' = 'interval 30 days',
|'delta.enableFullRetentionRollback' = 'true')
""".stripMargin)
log.checkpoint()
setModificationTime(log, startTime, 0, 0, fs)
setModificationTime(log, startTime, 1, 0, fs)
// Create commits [2, 6] with a checkpoint per commit
2 to 6 foreach { i =>
log.startTransaction().commitManually(createTestAddFile(s"$i"))
log.checkpoint()
setModificationTime(log, startTime, i, 0, fs)
}
// Create unbackfilled commit [7] with no checkpoints
log.startTransaction().commitManually(createTestAddFile("7"))
setModificationTime(log, startTime, 7, 0, fs)

// Everything is eligible for deletion but we don't consider the unbackfilled commit,
// i.e. [7], for deletion because it is part of the current LogSegment.
clock.setTime(day(startTime, 100))
log.cleanUpExpiredLogs(log.update())
// Since we also need a checkpoint, [6] is also protected.
val firstProtectedVersion = 6
compareVersions(
getDeltaVersions(logPath),
"backfilled delta",
firstProtectedVersion to 6)
compareVersions(
getUnbackfilledDeltaVersions(logPath),
"unbackfilled delta",
firstProtectedVersion to 7)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import scala.collection.mutable
import org.apache.spark.sql.delta.DeltaOperations.Truncate
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.actions.{CheckpointMetadata, Metadata, SidecarFile}
import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.FileNames.{newV2CheckpointJsonFile, newV2CheckpointParquetFile}
import org.apache.commons.lang3.time.DateUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
Expand All @@ -39,7 +40,8 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ManualClock

trait DeltaRetentionSuiteBase extends QueryTest
with SharedSparkSession {
with SharedSparkSession
with ManagedCommitBaseSuite {
protected val testOp = Truncate()

protected override def sparkConf: SparkConf = super.sparkConf
Expand Down Expand Up @@ -88,9 +90,26 @@ trait DeltaRetentionSuiteBase extends QueryTest
}

protected def getDeltaVersions(dir: File): Set[Long] = {
getFileVersions(getDeltaFiles(dir))
val backfilledDeltaVersions = getFileVersions(getDeltaFiles(dir))
val unbackfilledDeltaVersions = getUnbackfilledDeltaVersions(dir)
if (managedCommitsEnabledInTests) {
// The unbackfilled commit files (except commit 0) should be a superset of the backfilled
// commit files since they're always deleted together in this suite.
assert(
unbackfilledDeltaVersions.toArray.sorted.startsWith(
backfilledDeltaVersions.filter(_ != 0).toArray.sorted))
}
backfilledDeltaVersions
}

protected def getUnbackfilledDeltaFiles(dir: File): Seq[File] = {
val commitDirPath = FileNames.commitDirPath(new Path(dir.toURI))
getDeltaFiles(new File(commitDirPath.toUri))
}

protected def getUnbackfilledDeltaVersions(dir: File): Set[Long] =
getFileVersions(getUnbackfilledDeltaFiles(dir))

protected def getSidecarFiles(log: DeltaLog): Set[String] = {
new java.io.File(log.sidecarDirPath.toUri)
.listFiles()
Expand Down Expand Up @@ -135,6 +154,35 @@ trait DeltaRetentionSuiteBase extends QueryTest
}
}

protected def setModificationTime(
log: DeltaLog,
startTime: Long,
version: Int,
dayNum: Int,
fs: FileSystem,
checkpointOnly: Boolean = false): Unit = {
val logDir = log.logPath.toUri.toString
val paths = log
.listFrom(version)
.collect { case FileNames.CheckpointFile(f, v) if v == version => f.getPath }
.toSeq
paths.foreach { cpPath =>
// Add some second offset so that we don't have files with same timestamps
fs.setTimes(cpPath, day(startTime, dayNum) + version * 1000, 0)
}
if (!checkpointOnly) {
val deltaPath = new Path(logDir + f"/$version%020d.json")
if (fs.exists(deltaPath)) {
// Add some second offset so that we don't have files with same timestamps
fs.setTimes(deltaPath, day(startTime, dayNum) + version * 1000, 0)
}
// Add the same timestamp for unbackfilled delta files as well
fs.listStatus(FileNames.commitDirPath(log.logPath))
.find(_.getPath.getName.startsWith(f"$version%020d"))
.foreach(f => fs.setTimes(f.getPath, day(startTime, dayNum) + version * 1000, 0))
}
}

protected def day(startTime: Long, day: Int): Long =
startTime + intervalStringToMillis(s"interval $day days")

Expand Down

0 comments on commit e7d0b31

Please sign in to comment.