Skip to content

Commit

Permalink
Add types
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Apr 16, 2024
1 parent 674b17e commit e0ba7ff
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ object DeltaHistoryManager extends DeltaLogging {

protected def isNewGroupStart(item: T): Boolean

protected def transformItem(item: T): T
protected def transformItem(item: T): T = item

private def queueItemsIfNeeded(): Unit = {
if (bufferedOutput.isEmpty && bufferedUnderlying.hasNext) {
Expand All @@ -472,7 +472,18 @@ object DeltaHistoryManager extends DeltaLogging {
}
}

// All files with the same commit version
type SameCommitVersionFileGroup = ArrayBuffer[FileStatus]

// All commits whose timestamps need to be adjusted because of the first commit in the group
type TimestampAdjustedCommitGroup = ArrayBuffer[SameCommitVersionFileGroup]

// All timestamp groups that depend on the same checkpoint
type DependentCheckpointGroup = ArrayBuffer[TimestampAdjustedCommitGroup]

/**
* Returns all checkpoint groups except the last, so we never delete the last checkpoint.
*
* An iterator that helps select old log files for deletion. It takes the input iterator of
* eligible log file groups to delete, grouped by version and timestamp-adjusting dependency, and
* returns all groups up to but not including the last group containing a checkpoint.
Expand All @@ -486,37 +497,38 @@ object DeltaHistoryManager extends DeltaLogging {
* Note 2: This code assumes that underlying iterator list groups in a sorted-by-version order.
*/
class LastCheckpointPreservingLogDeletionIterator(
underlying: Iterator[ArrayBuffer[ArrayBuffer[FileStatus]]])
extends Iterator[ArrayBuffer[ArrayBuffer[ArrayBuffer[FileStatus]]]]{
underlying: Iterator[TimestampAdjustedCommitGroup])
extends Iterator[DependentCheckpointGroup] {

private class CheckpointGroupingIterator(
underlying: Iterator[ArrayBuffer[ArrayBuffer[FileStatus]]])
extends GroupBreakingIterator[ArrayBuffer[ArrayBuffer[FileStatus]]](underlying) {
private class CheckpointGroupingIterator(underlying: Iterator[TimestampAdjustedCommitGroup])
extends GroupBreakingIterator[TimestampAdjustedCommitGroup](underlying) {

override protected def isNewGroupStart(
files: ArrayBuffer[ArrayBuffer[FileStatus]]): Boolean = {
override protected def isNewGroupStart(files: TimestampAdjustedCommitGroup): Boolean = {
val checkpointPaths = files.flatten.toArray.collect {
case f if isCheckpointFile(f) => CheckpointInstance(f.getPath)
}
Checkpoints.getLatestCompleteCheckpointFromList(checkpointPaths).isDefined
}

override protected def transformItem(
files: ArrayBuffer[ArrayBuffer[FileStatus]]): ArrayBuffer[ArrayBuffer[FileStatus]] = files
}

// 1. Group by checkpoints dependency.
// 2. Drop everything but the last checkpoint-dependency group.
// .sliding(2) can return a single entry if the underlying iterators' size is 1
//
// Implement Iterator.dropRight(1) using Iterator.sliding(2).map(_.head) since scala doesn't
// provide a dropRight method for iterators.
// (A, B), (B, C), ..., (X, Y), (Y, Z)
// ^ ^ ... ^ ^
// Note: Iterator.sliding(2) can return a single entry if the underlying iterators' size is 1
// https://www.scala-lang.org/old/node/7939
private val lastCheckpointPreservingIterator:
Iterator[ArrayBuffer[ArrayBuffer[ArrayBuffer[FileStatus]]]] =
// We fix by filtering out groups of length 1.
// (A)
// ^
private val lastCheckpointPreservingIterator: Iterator[DependentCheckpointGroup] =
new CheckpointGroupingIterator(underlying).sliding(2).filter(_.length == 2).map(_.head)

override def hasNext: Boolean = lastCheckpointPreservingIterator.hasNext

override def next: ArrayBuffer[ArrayBuffer[ArrayBuffer[FileStatus]]] =
lastCheckpointPreservingIterator.next()
override def next: DependentCheckpointGroup = lastCheckpointPreservingIterator.next()
}

/**
Expand Down Expand Up @@ -566,11 +578,13 @@ object DeltaHistoryManager extends DeltaLogging {
* @param maxTimestamp The timestamp until which we can delete (inclusive).
*/
class TimestampAdjustingLogDeletionIterator(
underlying: Iterator[ArrayBuffer[FileStatus]],
maxTimestamp: Long) extends Iterator[ArrayBuffer[ArrayBuffer[FileStatus]]] {
underlying: Iterator[SameCommitVersionFileGroup],
maxTimestamp: Long)
extends Iterator[TimestampAdjustedCommitGroup] {

private class TimestampAdjustingGroupedIterator(underlying: Iterator[ArrayBuffer[FileStatus]])
extends GroupBreakingIterator[ArrayBuffer[FileStatus]](underlying) {
private class TimestampAdjustingGroupedIterator(
underlying: Iterator[SameCommitVersionFileGroup])
extends GroupBreakingIterator[SameCommitVersionFileGroup](underlying) {

private var lastCommitFileOpt: Option[FileStatus] = None

Expand All @@ -597,7 +611,7 @@ object DeltaHistoryManager extends DeltaLogging {
* Non-delta files must continue to be in the same group because delta files coming later
* might need to adjust their timestamps based on the lastCommitFileInfoOpt.
*/
override protected def isNewGroupStart(files: ArrayBuffer[FileStatus]): Boolean = {
override protected def isNewGroupStart(files: SameCommitVersionFileGroup): Boolean = {
files.find(isDeltaFile).exists(!needsTimestampAdjustment(_))
}

Expand All @@ -614,7 +628,7 @@ object DeltaHistoryManager extends DeltaLogging {
}

override protected def transformItem(
files: ArrayBuffer[FileStatus]): ArrayBuffer[FileStatus] = {
files: SameCommitVersionFileGroup): SameCommitVersionFileGroup = {
val commitFileIndex = files.indexWhere(isDeltaFile)
if (commitFileIndex >= 0) {
val commitFile = files(commitFileIndex)
Expand All @@ -636,21 +650,21 @@ object DeltaHistoryManager extends DeltaLogging {
/**
* A timestamp-adjusting group can be deleted if the last delta file in it can be deleted.
*/
private def shouldDeleteGroup(group: ArrayBuffer[ArrayBuffer[FileStatus]]): Boolean = {
private def shouldDeleteGroup(group: TimestampAdjustedCommitGroup): Boolean = {
group.flatten.reverse.find((file: FileStatus) => isDeltaFile(file))
.forall(file => shouldDeleteFile(file))
}

private def transformLastGroup(
group: ArrayBuffer[ArrayBuffer[FileStatus]]
): ArrayBuffer[ArrayBuffer[FileStatus]] = {
group: TimestampAdjustedCommitGroup): TimestampAdjustedCommitGroup = {
val deltaFileIdx = group.lastIndexWhere { files =>
files.exists((file: FileStatus) => isDeltaFile(file)) }
files.exists((file: FileStatus) => isDeltaFile(file))
}
group.take(deltaFileIdx + 1) ++
group.drop(deltaFileIdx + 1).takeWhile(_.forall(shouldDeleteFile))
}

val filteredIterator: Iterator[ArrayBuffer[ArrayBuffer[FileStatus]]] =
val filteredIterator: Iterator[TimestampAdjustedCommitGroup] =
new TimestampAdjustingGroupedIterator(underlying).takeWhile(shouldDeleteGroup)

override def hasNext: Boolean = filteredIterator.hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import java.util.{Calendar, TimeZone}

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.DeltaHistoryManager.DeltaLogGroupingIterator

import org.apache.spark.sql.delta.DeltaHistoryManager.{LastCheckpointPreservingLogDeletionIterator, TimestampAdjustingLogDeletionIterator}
import org.apache.spark.sql.delta.DeltaHistoryManager._
import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, MINUTE, TruncationGranularity}
import org.apache.spark.sql.delta.actions.{Action, Metadata}
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -111,9 +109,30 @@ trait MetadataCleanup extends DeltaLogging {

/**
* Returns an iterator of expired delta logs that can be cleaned up. For a delta log to be
* considered as expired, it must:
* considered as expired, it must meet all of these conditions:
* - not have a timestamp-adjusted delta file that depends on it
* - have a checkpoint file after that's before the `fileCutOffTime`
* - be older than `fileCutOffTime`
*
* The algorithm works as follows:
* 1. Group files by their version number to create same-commit-version-file-group.
* 2. Group same-commit-version-file-group by their timestamp skew (adjusted timestamp) to
* create timestamp-adjusted-commit-groups.
* 3. Keep only the timestamp-adjusted-commit-groups whose start timestamp is less than or equal
* to the cutoff timestamp.
* 4. Remove any timestamp-adjusted-commit-groups that are fully protected.
* 5. For the last timestamp group, remove any version groups whose adjusted timestamp is after
* the cutoff.
* 6. Check each remaining timestamp-adjusted-commit-group to see if it contains a complete
* checkpoint.
* 7. Group timestamp-adjusted-commit-group based on their checkpoint dependency (i.e., which
* checkpoint they depend on) to create dependent-checkpoint-groups.
* 8. Remove the last dependent-checkpoint-groups group (to ensure we always retain at least one
* checkpoint).
* 9. Triple Flatten the remaining groups and delete all those files.
*
* Note: We always consume the iterator lazily in all of the above steps to avoid loading all
* files in memory at once.
*/
private def listExpiredDeltaLogs(fileCutOffTime: Long): Iterator[FileStatus] = {
import org.apache.spark.sql.delta.util.FileNames._
Expand All @@ -123,11 +142,21 @@ trait MetadataCleanup extends DeltaLogging {
val files = store.listFrom(listingPrefix(logPath, 0), newDeltaHadoopConf())
.filter(f => isCheckpointFile(f) || isDeltaFile(f))

new LastCheckpointPreservingLogDeletionIterator(
val groupedByVersion: Iterator[SameCommitVersionFileGroup] =
new DeltaLogGroupingIterator(files).map(_._2)

val filteredByMaxTimestampAndTimestampSkew: Iterator[TimestampAdjustedCommitGroup] =
new TimestampAdjustingLogDeletionIterator(
underlying = new DeltaLogGroupingIterator(files).map(_._2),
maxTimestamp = fileCutOffTime
)).flatten.flatten.flatten
underlying = groupedByVersion,
maxTimestamp = fileCutOffTime)

val filteredByCheckpointDependency: Iterator[DependentCheckpointGroup] =
new LastCheckpointPreservingLogDeletionIterator(filteredByMaxTimestampAndTimestampSkew)

val eligibleFilesToDelete: Iterator[FileStatus] =
filteredByCheckpointDependency.flatten.flatten.flatten

eligibleFilesToDelete
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,10 @@ class DeltaRetentionSuite extends QueryTest
|'delta.enableExpiredLogCleanup' = 'false',
|'delta.checkpointInterval' = '10')
""".stripMargin)
// Set time for commit 0 to ensure that the commits don't need timestamp adjustment.
val commit0Time = clock.getTimeMillis()
new File(FileNames.deltaFile(log.logPath, 0).toUri).setLastModified(commit0Time)
new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time)

// Day 0: Add commits 1 to 15 --> creates 1 checkpoint at Day 0 for version 10
(1 to 15).foreach { i =>
Expand Down Expand Up @@ -684,9 +688,9 @@ class DeltaRetentionSuite extends QueryTest
deltaFiles.zipWithIndex.foreach { case (f, i) =>
val version = i + 1 // From 0-based indexing to 1-based versioning
if (version < 10) {
assert(!f.exists())
assert(!f.exists(), version)
} else {
assert(f.exists())
assert(f.exists(), version)
}
}
}
Expand Down

0 comments on commit e0ba7ff

Please sign in to comment.