Skip to content

Commit

Permalink
Replace hour truncation with minute truncation
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Nov 22, 2023
1 parent 13f7fbc commit 38468f5
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{Calendar, TimeZone}
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator
import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, TruncationGranularity}
import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, MINUTE, TruncationGranularity}
import org.apache.spark.sql.delta.actions.{Action, Metadata}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.FileNames
Expand All @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

private[delta] object TruncationGranularity extends Enumeration {
type TruncationGranularity = Value
val DAY, HOUR = Value
val DAY, HOUR, MINUTE = Value
}

/** Cleans up expired Delta table metadata. */
Expand Down Expand Up @@ -133,9 +133,10 @@ trait MetadataCleanup extends DeltaLogging {
}

/**
* Truncates a timestamp down to a given unit. The unit can be either DAY or HOUR.
* Truncates a timestamp down to a given unit. The unit can be either DAY, HOUR or MINUTE.
* - DAY: The timestamp it truncated to the previous midnight.
* - HOUR: The timestamp it truncated to the last hour.
* - MINUTE: The timestamp it truncated to the last minute.
*/
private[delta] def truncateDate(timeMillis: Long, unit: TruncationGranularity): Calendar = {
val date = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
Expand All @@ -144,6 +145,7 @@ trait MetadataCleanup extends DeltaLogging {
val calendarUnit = unit match {
case DAY => Calendar.DAY_OF_MONTH
case HOUR => Calendar.HOUR_OF_DAY
case MINUTE => Calendar.MINUTE
}

DateUtils.truncate(date, calendarUnit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,11 @@ case class AlterTableDropFeatureDeltaCommand(
if (isReaderWriterFeature) {
// Clean up expired logs before checking history. This also makes sure there is no
// concurrent metadataCleanup during findEarliestReliableCheckpoint. Note, this
// cleanUpExpiredLogs call truncates the cutoff at an hour granularity.
// cleanUpExpiredLogs call truncates the cutoff at a minute granularity.
deltaLog.cleanUpExpiredLogs(
snapshot,
truncateHistoryLogRetentionMillis(txn),
TruncationGranularity.HOUR)
TruncationGranularity.MINUTE)

val historyContainsFeature = removableFeature.historyContainsFeature(
spark = sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2387,7 +2387,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
} else {
deltaLog.deltaRetentionMillis(deltaLog.update().metadata)
}
clock.advance(clockAdvanceMillis + TimeUnit.HOURS.toMillis(1))
clock.advance(clockAdvanceMillis + TimeUnit.MINUTES.toMillis(5))
}

val dropCommand = AlterTableDropFeatureDeltaCommand(
Expand Down Expand Up @@ -2715,7 +2715,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest

// The retention period has passed since the disablement.
clock.advance(
deltaRetentionMillis - TimeUnit.DAYS.toMillis(10) + TimeUnit.HOURS.toMillis(1))
deltaRetentionMillis - TimeUnit.DAYS.toMillis(10) + TimeUnit.MINUTES.toMillis(5))

// Cleanup logs.
deltaLog.cleanUpExpiredLogs(deltaLog.update())
Expand Down Expand Up @@ -2802,7 +2802,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
// Pretend retention period has passed.
clock.advance(
deltaLog.deltaRetentionMillis(deltaLog.update().metadata) +
TimeUnit.HOURS.toMillis(1))
TimeUnit.MINUTES.toMillis(5))

// History is now clean. We should be able to remove the feature.
AlterTableDropFeatureDeltaCommand(
Expand Down Expand Up @@ -2906,7 +2906,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest

// Pretend retention period has passed.
clock.advance(deltaLog.deltaRetentionMillis(deltaLog.update().metadata) +
TimeUnit.HOURS.toMillis(1))
TimeUnit.MINUTES.toMillis(5))

// Perform an unrelated metadata change.
sql(s"ALTER TABLE delta.`${deltaLog.dataPath}` ADD COLUMN (value INT)")
Expand Down Expand Up @@ -3241,7 +3241,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
// Pretend retention period has passed.
clock.advance(
targetLog.deltaRetentionMillis(targetLog.update().metadata) +
TimeUnit.HOURS.toMillis(1))
TimeUnit.MINUTES.toMillis(5))

// History is now clean. We should be able to remove the feature.
dropV2CheckpointsTableFeature(spark, targetLog)
Expand Down

0 comments on commit 38468f5

Please sign in to comment.