Skip to content

Commit

Permalink
[Spark] Backfill commit files before checkpointing or minor compactio…
Browse files Browse the repository at this point in the history
…n in managed-commits (delta-io#2823)

#### Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

With managed-commit, commit files are not guaranteed to be present in
the _delta_log directory at the time of checkpointing or minor
compactions. While it is possible to compute a checkpoint file without
backfilling, writing the checkpoint file in the log directory before
backfilling the relevant commits will leave gaps in the directory
structure. This can cause issues for readers that are not communicating
with the CommitStore.

To address this problem, we now backfill commit files up to the
committedVersion before performing a checkpoint or minor compaction
operation

## How was this patch tested?

UTs

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
sumeet-db authored Apr 2, 2024
1 parent a014fee commit c88db07
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 24 deletions.
19 changes: 17 additions & 2 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -344,8 +345,22 @@ trait Checkpoints extends DeltaLogging {
}
}

protected def writeCheckpointFiles(
snapshotToCheckpoint: Snapshot): LastCheckpointInfo = {
protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): LastCheckpointInfo = {
// With Managed-Commits, commit files are not guaranteed to be backfilled immediately in the
// _delta_log dir. While it is possible to compute a checkpoint file without backfilling,
// writing the checkpoint file in the log directory before backfilling the relevant commits
// will leave gaps in the dir structure. This can cause issues for readers that are not
// communicating with the CommitStore.
//
// Sample directory structure with a gap if we don't backfill commit files:
// _delta_log/
// _commits/
// 00017.$uuid.json
// 00018.$uuid.json
// 00015.json
// 00016.json
// 00018.checkpoint.parquet
snapshotToCheckpoint.ensureCommitFilesBackfilled()
Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1929,6 +1929,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
override def getCommits(
logPath: Path, startVersion: Long, endVersion: Option[Long]): GetCommitsResponse =
GetCommitsResponse(Seq.empty, -1)

override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
}

/**
Expand Down
32 changes: 32 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,38 @@ class Snapshot(
def redactedPath: String =
Utils.redact(spark.sessionState.conf.stringRedactionPattern, path.toUri.toString)

/**
* Ensures that commit files are backfilled up to the current version in the snapshot.
*
* This method checks if there are any un-backfilled versions up to the current version and
* triggers the backfilling process using the commit store. It verifies that the delta file for
* the current version exists after the backfilling process.
*
* @throws IllegalStateException
* if the delta file for the current version is not found after backfilling.
*/
def ensureCommitFilesBackfilled(): Unit = {
val commitStore = commitStoreOpt.getOrElse {
return
}
val minUnbackfilledVersion = DeltaCommitFileProvider(this).minUnbackfilledVersion
if (minUnbackfilledVersion <= version) {
val hadoopConf = deltaLog.newDeltaHadoopConf()
commitStore.backfillToVersion(
deltaLog.store,
hadoopConf,
deltaLog.logPath,
startVersion = minUnbackfilledVersion,
endVersion = Some(version))
val fs = deltaLog.logPath.getFileSystem(hadoopConf)
val expectedBackfilledDeltaFile = FileNames.deltaFile(deltaLog.logPath, version)
if (!fs.exists(expectedBackfilledDeltaFile)) {
throw new IllegalStateException("Backfilling of commit files failed. " +
s"Expected delta file $expectedBackfilledDeltaFile not found.")
}
}
}


protected def emptyDF: DataFrame =
spark.createDataFrame(spark.sparkContext.emptyRDD[Row], logSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,16 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {

protected def generateUUID(): String = UUID.randomUUID().toString

/** Backfills all un-backfilled commits */
protected def backfillToVersion(
override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path): Unit = {
getCommits(logPath, startVersion = 0).commits.foreach { commit =>
backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus)
logPath: Path,
startVersion: Long = 0,
endVersionOpt: Option[Long] = None): Unit = {
getCommits(logPath, startVersion, endVersionOpt)
.commits
.foreach { commit =>
backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package org.apache.spark.sql.delta.managedcommit

import scala.collection.mutable

import org.apache.spark.sql.delta.{DeltaConfigs, InitialSnapshot, ManagedCommitTableFeature, SerializableFileStatus, SnapshotDescriptor}
import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.{DeltaConfigs, ManagedCommitTableFeature, SnapshotDescriptor}
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.hadoop.fs.{FileStatus, Path}

/** Representation of a commit file */
case class Commit(
Expand Down Expand Up @@ -92,9 +90,22 @@ trait CommitStore {
* tracked by [[CommitStore]].
*/
def getCommits(
logPath: Path,
startVersion: Long,
endVersion: Option[Long] = None): GetCommitsResponse
logPath: Path,
startVersion: Long,
endVersion: Option[Long] = None): GetCommitsResponse

/**
* API to ask the Commit-Owner to backfill all commits >= 'startVersion' and <= `endVersion`.
*
* If this API returns successfully, that means the backfill must have been completed, although
* the Commit-Owner may not be aware of it yet.
*/
def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit
}

/** A builder interface for CommitStore */
Expand Down Expand Up @@ -146,7 +157,7 @@ object CommitStoreProvider {
nameToBuilderMapping.retain((k, _) => initialCommitStoreBuilderNames.contains(k))
}

val initialCommitStoreBuilders = Seq[CommitStoreBuilder](
private val initialCommitStoreBuilders = Seq[CommitStoreBuilder](
// Any new commit-store builder will be registered here.
)
initialCommitStoreBuilders.foreach(registerBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,35 @@ import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.hadoop.fs.Path

case class DeltaCommitFileProvider(logPath: String, maxVersion: Long, uuids: Map[Long, String]) {
/**
* Provides access to resolve Delta commit files names based on the commit-version.
*
* This class is part of the changes introduced to accommodate the adoption of managed-commits in
* Delta Lake. Previously, certain code paths assumed the existence of delta files for a specific
* version at a predictable path `_delta_log/$version.json`. With managed-commits, delta files may
* alternatively be located at `_delta_log/_commits/$version.$uuid.json`. DeltaCommitFileProvider
* attempts to locate the correct delta files from the Snapshot's LogSegment.
*
* @param logPath The path to the Delta table log directory.
* @param maxVersionInclusive The maximum version of the Delta table (inclusive).
* @param uuids A map of version numbers to their corresponding UUIDs.
*/
case class DeltaCommitFileProvider(
logPath: String,
maxVersionInclusive: Long,
uuids: Map[Long, String]) {
// Ensure the Path object is reused across Delta Files but not stored as part of the object state
// since it is not serializable.
@transient lazy val resolvedPath: Path = new Path(logPath)
lazy val minUnbackfilledVersion: Long =
if (uuids.keys.isEmpty) {
maxVersionInclusive + 1
} else {
uuids.keys.min
}

def deltaFile(version: Long): Path = {
if (version > maxVersion) {
if (version > maxVersionInclusive) {
throw new IllegalStateException("Cannot resolve Delta table at version $version as the " +
"state is currently at version $maxVersion. The requested version may be incorrect or " +
"the state may be outdated. Please verify the requested version, update the state if " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import scala.concurrent.duration._
import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite
import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.spark.sql.delta.util.FileNames
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
Expand All @@ -45,7 +47,8 @@ class CheckpointsSuite
extends QueryTest
with SharedSparkSession
with DeltaCheckpointTestUtils
with DeltaSQLCommandTest {
with DeltaSQLCommandTest
with ManagedCommitBaseSuite {

def testDifferentV2Checkpoints(testName: String)(f: => Unit): Unit = {
for (checkpointFormat <- Seq(V2Checkpoint.Format.JSON.name, V2Checkpoint.Format.PARQUET.name)) {
Expand Down Expand Up @@ -404,7 +407,10 @@ class CheckpointsSuite
// CDC should exist in the log as seen through getChanges, but it shouldn't be in the
// snapshots and the checkpoint file shouldn't have a CDC column.
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile]))
val deltaPath = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot)
.deltaFile(version = 1)
val deltaFileContent = deltaLog.store.read(deltaPath, deltaLog.newDeltaHadoopConf())
assert(deltaFileContent.map(Action.fromJson).exists(_.isInstanceOf[AddCDCFile]))
assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null })
deltaLog.checkpoint()
val checkpointFile = FileNames.checkpointFileSingular(deltaLog.logPath, 1)
Expand Down Expand Up @@ -458,7 +464,10 @@ class CheckpointsSuite
// CDC should exist in the log as seen through getChanges, but it shouldn't be in the
// snapshots and the checkpoint file shouldn't have a CDC column.
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile]))
val deltaPath = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot)
.deltaFile(version = 1)
val deltaFileContent = deltaLog.store.read(deltaPath, deltaLog.newDeltaHadoopConf())
assert(deltaFileContent.map(Action.fromJson).exists(_.isInstanceOf[AddCDCFile]))
assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null })
deltaLog.checkpoint()
var sidecarCheckpointFiles = getV2CheckpointProvider(deltaLog).sidecarFileStatuses
Expand Down Expand Up @@ -977,3 +986,15 @@ class FakeGCSFileSystem extends RawLocalFileSystem {
}
}

class ManagedCommitBatch1BackFillCheckpointsSuite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class ManagedCommitBatch2BackFillCheckpointsSuite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class ManagedCommitBatch20BackFillCheckpointsSuite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(20)
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames}
import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
Expand All @@ -32,14 +33,23 @@ import org.apache.spark.sql.test.SharedSparkSession
class DeltaLogMinorCompactionSuite extends QueryTest
with SharedSparkSession
with DeltaSQLCommandTest
with DeltaSQLTestUtils {
with DeltaSQLTestUtils
with ManagedCommitBaseSuite {

/** Helper method to do minor compaction of [[DeltaLog]] from [startVersion, endVersion] */
private def minorCompactDeltaLog(
tablePath: String,
startVersion: Long,
endVersion: Long): Unit = {
val deltaLog = DeltaLog.forTable(spark, tablePath)
deltaLog.update().commitStoreOpt.foreach { commitStore =>
commitStore.backfillToVersion(
deltaLog.store,
deltaLog.newDeltaHadoopConf(),
deltaLog.logPath,
startVersion = 0,
Some(endVersion))
}
val logReplay = new InMemoryLogReplay(
minFileRetentionTimestamp = 0,
minSetTransactionRetentionTimestamp = None)
Expand All @@ -65,7 +75,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest
numRemoves: Int = 0,
numMetadata: Int = 0): Unit = {
assert(log.update().version === version)
val filePath = FileNames.deltaFile(log.logPath, version)
val filePath = DeltaCommitFileProvider(log.update()).deltaFile(version)
val actions = log.store.read(filePath, log.newDeltaHadoopConf()).map(Action.fromJson)
assert(actions.head.isInstanceOf[CommitInfo])
assert(actions.tail.count(_.isInstanceOf[AddFile]) === numAdds)
Expand Down Expand Up @@ -435,3 +445,15 @@ class DeltaLogMinorCompactionSuite extends QueryTest
}
}

class ManagedCommitBatchBackfill1DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class ManagedCommitBatchBackFill2DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class ManagedCommitBatchBackFill20DeltaLogMinorCompactionSuite
extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(20)
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,12 @@ class OptimisticTransactionSuite
tablePath: Path,
startVersion: Long,
endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1)
override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
}
}
}
Expand Down Expand Up @@ -531,6 +537,12 @@ class OptimisticTransactionSuite
tablePath: Path,
startVersion: Long,
endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1)
override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark
logPath: Path,
startVersion: Long,
endVersion: Option[Long] = None): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1)

override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
}

class TestCommitStore1 extends TestCommitStoreBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,23 @@ class ManagedCommitSuite
checkAnswer(sql(s"SELECT * FROM delta.`$tablePath`"), Seq(Row(1), Row(2), Row(3), Row(4)))
}
}

testWithDifferentBackfillInterval("ensure backfills commit files works as expected") { _ =>
withTempDir { tempDir =>
val tablePath = tempDir.getAbsolutePath

// Add 10 commits to the table
Seq(1).toDF().write.format("delta").mode("overwrite").save(tablePath)
2 to 10 foreach { i =>
Seq(i).toDF().write.format("delta").mode("append").save(tablePath)
}
val log = DeltaLog.forTable(spark, tablePath)
val snapshot = log.update()
snapshot.ensureCommitFilesBackfilled()

val commitFiles = log.listFrom(0).filter(FileNames.isDeltaFile).map(_.getPath)
val backfilledCommitFiles = (0 to 9).map(version => FileNames.deltaFile(log.logPath, version))
assert(commitFiles.toSeq == backfilledCommitFiles)
}
}
}
Loading

0 comments on commit c88db07

Please sign in to comment.