Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Apr 1, 2024
1 parent 537ed8e commit fc9821c
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 24 deletions.
19 changes: 17 additions & 2 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -344,8 +345,22 @@ trait Checkpoints extends DeltaLogging {
}
}

protected def writeCheckpointFiles(
snapshotToCheckpoint: Snapshot): LastCheckpointInfo = {
protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): LastCheckpointInfo = {
// With Managed-Commits, commit files are not guaranteed to be backfilled immediately in the
// _delta_log dir. While it is possible to compute a checkpoint file without backfilling,
// writing the checkpoint file in the log directory before backfilling the relevant commits
// will leave gaps in the dir structure. This can cause issues for readers that are not
// communicating with the CommitStore.
//
// Sample directory structure with a gap if we don't backfill commit files:
// _delta_log/
// _commits/
// 00017.$uuid.json
// 00018.$uuid.json
// 00015.json
// 00016.json
// 00018.checkpoint.parquet
snapshotToCheckpoint.ensureCommitFilesBackfilled()
Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1918,6 +1918,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
override def getCommits(
logPath: Path, startVersion: Long, endVersion: Option[Long]): GetCommitsResponse =
GetCommitsResponse(Seq.empty, -1)

override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
}

/**
Expand Down
32 changes: 32 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,38 @@ class Snapshot(
def redactedPath: String =
Utils.redact(spark.sessionState.conf.stringRedactionPattern, path.toUri.toString)

/**
* Ensures that commit files are backfilled up to the current version in the snapshot.
*
* This method checks if there are any un-backfilled versions up to the current version and
* triggers the backfilling process using the commit store. It verifies that the delta file for
* the current version exists after the backfilling process.
*
* @throws IllegalStateException
* if the delta file for the current version is not found after backfilling.
*/
def ensureCommitFilesBackfilled(): Unit = {
val commitStore = commitStoreOpt.getOrElse {
return
}
val minUnbackfilledVersion = DeltaCommitFileProvider(this).minUnbackfilledVersion
if (minUnbackfilledVersion <= version) {
val hadoopConf = deltaLog.newDeltaHadoopConf()
commitStore.backfillToVersion(
deltaLog.store,
hadoopConf,
deltaLog.logPath,
startVersion = minUnbackfilledVersion,
endVersion = Some(version))
val fs = deltaLog.logPath.getFileSystem(hadoopConf)
val expectedBackfilledDeltaFile = FileNames.deltaFile(deltaLog.logPath, version)
if (!fs.exists(expectedBackfilledDeltaFile)) {
throw new IllegalStateException("Backfilling of commit files failed. " +
s"Expected delta file $expectedBackfilledDeltaFile not found.")
}
}
}


protected def emptyDF: DataFrame =
spark.createDataFrame(spark.sparkContext.emptyRDD[Row], logSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,16 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {

protected def generateUUID(): String = UUID.randomUUID().toString

/** Backfills all un-backfilled commits */
protected def backfillToVersion(
override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path): Unit = {
getCommits(logPath, startVersion = 0).commits.foreach { commit =>
backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus)
logPath: Path,
startVersion: Long = 0,
endVersionOpt: Option[Long] = None): Unit = {
getCommits(logPath, startVersion, endVersionOpt)
.commits
.foreach { commit =>
backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package org.apache.spark.sql.delta.managedcommit

import scala.collection.mutable

import org.apache.spark.sql.delta.{DeltaConfigs, InitialSnapshot, ManagedCommitTableFeature, SerializableFileStatus, SnapshotDescriptor}
import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.{DeltaConfigs, ManagedCommitTableFeature, SnapshotDescriptor}
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.hadoop.fs.{FileStatus, Path}

/** Representation of a commit file */
case class Commit(
Expand Down Expand Up @@ -92,9 +90,22 @@ trait CommitStore {
* tracked by [[CommitStore]].
*/
def getCommits(
logPath: Path,
startVersion: Long,
endVersion: Option[Long] = None): GetCommitsResponse
logPath: Path,
startVersion: Long,
endVersion: Option[Long] = None): GetCommitsResponse

/**
* API to ask the Commit-Owner to backfill all commits >= 'startVersion' and <= `endVersion`.
*
* If this API returns successfully, that means the backfill must have been completed, although
* the Commit-Owner may not be aware of it yet.
*/
def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit
}

/** A builder interface for CommitStore */
Expand Down Expand Up @@ -146,7 +157,7 @@ object CommitStoreProvider {
nameToBuilderMapping.retain((k, _) => initialCommitStoreBuilderNames.contains(k))
}

val initialCommitStoreBuilders = Seq[CommitStoreBuilder](
private val initialCommitStoreBuilders = Seq[CommitStoreBuilder](
// Any new commit-store builder will be registered here.
)
initialCommitStoreBuilders.foreach(registerBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,35 @@ import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.hadoop.fs.Path

case class DeltaCommitFileProvider(logPath: String, maxVersion: Long, uuids: Map[Long, String]) {
/**
* Provides access to resolve Delta commit files names based on the commit-version.
*
* This class is part of the changes introduced to accommodate the adoption of managed-commits in
* Delta Lake. Previously, certain code paths assumed the existence of delta files for a specific
* version at a predictable path `_delta_log/$version.json`. With managed-commits, delta files may
* alternatively be located at `_delta_log/_commits/$version.$uuid.json`. DeltaCommitFileProvider
* attempts to locate the correct delta files from the Snapshot's LogSegment.
*
* @param logPath The path to the Delta table log directory.
* @param maxVersionInclusive The maximum version of the Delta table (inclusive).
* @param uuids A map of version numbers to their corresponding UUIDs.
*/
case class DeltaCommitFileProvider(
logPath: String,
maxVersionInclusive: Long,
uuids: Map[Long, String]) {
// Ensure the Path object is reused across Delta Files but not stored as part of the object state
// since it is not serializable.
@transient lazy val resolvedPath: Path = new Path(logPath)
lazy val minUnbackfilledVersion: Long =
if (uuids.keys.isEmpty) {
maxVersionInclusive + 1
} else {
uuids.keys.min
}

def deltaFile(version: Long): Path = {
if (version > maxVersion) {
if (version > maxVersionInclusive) {
throw new IllegalStateException("Cannot resolve Delta table at version $version as the " +
"state is currently at version $maxVersion. The requested version may be incorrect or " +
"the state may be outdated. Please verify the requested version, update the state if " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import scala.concurrent.duration._
import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite
import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.spark.sql.delta.util.FileNames
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
Expand All @@ -45,7 +47,8 @@ class CheckpointsSuite
extends QueryTest
with SharedSparkSession
with DeltaCheckpointTestUtils
with DeltaSQLCommandTest {
with DeltaSQLCommandTest
with ManagedCommitBaseSuite {

def testDifferentV2Checkpoints(testName: String)(f: => Unit): Unit = {
for (checkpointFormat <- Seq(V2Checkpoint.Format.JSON.name, V2Checkpoint.Format.PARQUET.name)) {
Expand Down Expand Up @@ -404,7 +407,10 @@ class CheckpointsSuite
// CDC should exist in the log as seen through getChanges, but it shouldn't be in the
// snapshots and the checkpoint file shouldn't have a CDC column.
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile]))
val deltaPath = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot)
.deltaFile(version = 1)
val deltaFileContent = deltaLog.store.read(deltaPath, deltaLog.newDeltaHadoopConf())
assert(deltaFileContent.map(Action.fromJson).exists(_.isInstanceOf[AddCDCFile]))
assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null })
deltaLog.checkpoint()
val checkpointFile = FileNames.checkpointFileSingular(deltaLog.logPath, 1)
Expand Down Expand Up @@ -458,7 +464,10 @@ class CheckpointsSuite
// CDC should exist in the log as seen through getChanges, but it shouldn't be in the
// snapshots and the checkpoint file shouldn't have a CDC column.
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile]))
val deltaPath = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot)
.deltaFile(version = 1)
val deltaFileContent = deltaLog.store.read(deltaPath, deltaLog.newDeltaHadoopConf())
assert(deltaFileContent.map(Action.fromJson).exists(_.isInstanceOf[AddCDCFile]))
assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null })
deltaLog.checkpoint()
var sidecarCheckpointFiles = getV2CheckpointProvider(deltaLog).sidecarFileStatuses
Expand Down Expand Up @@ -977,3 +986,15 @@ class FakeGCSFileSystem extends RawLocalFileSystem {
}
}

class ManagedCommitBatch1BackFillCheckpointsSuite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class ManagedCommitBatch2BackFillCheckpointsSuite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class ManagedCommitBatch20BackFillCheckpointsSuite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(20)
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames}
import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
Expand All @@ -32,14 +33,23 @@ import org.apache.spark.sql.test.SharedSparkSession
class DeltaLogMinorCompactionSuite extends QueryTest
with SharedSparkSession
with DeltaSQLCommandTest
with DeltaSQLTestUtils {
with DeltaSQLTestUtils
with ManagedCommitBaseSuite {

/** Helper method to do minor compaction of [[DeltaLog]] from [startVersion, endVersion] */
private def minorCompactDeltaLog(
tablePath: String,
startVersion: Long,
endVersion: Long): Unit = {
val deltaLog = DeltaLog.forTable(spark, tablePath)
deltaLog.update().commitStoreOpt.foreach { commitStore =>
commitStore.backfillToVersion(
deltaLog.store,
deltaLog.newDeltaHadoopConf(),
deltaLog.logPath,
startVersion = 0,
Some(endVersion))
}
val logReplay = new InMemoryLogReplay(
minFileRetentionTimestamp = 0,
minSetTransactionRetentionTimestamp = None)
Expand All @@ -65,7 +75,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest
numRemoves: Int = 0,
numMetadata: Int = 0): Unit = {
assert(log.update().version === version)
val filePath = FileNames.deltaFile(log.logPath, version)
val filePath = DeltaCommitFileProvider(log.update()).deltaFile(version)
val actions = log.store.read(filePath, log.newDeltaHadoopConf()).map(Action.fromJson)
assert(actions.head.isInstanceOf[CommitInfo])
assert(actions.tail.count(_.isInstanceOf[AddFile]) === numAdds)
Expand Down Expand Up @@ -435,3 +445,15 @@ class DeltaLogMinorCompactionSuite extends QueryTest
}
}

class ManagedCommitBatchBackfill1DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class ManagedCommitBatchBackFill2DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class ManagedCommitBatchBackFill20DeltaLogMinorCompactionSuite
extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(20)
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,12 @@ class OptimisticTransactionSuite
tablePath: Path,
startVersion: Long,
endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1)
override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
}
}
}
Expand Down Expand Up @@ -524,6 +530,12 @@ class OptimisticTransactionSuite
tablePath: Path,
startVersion: Long,
endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1)
override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark
logPath: Path,
startVersion: Long,
endVersion: Option[Long] = None): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1)

override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
}

class TestCommitStore1 extends TestCommitStoreBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,23 @@ class ManagedCommitSuite
checkAnswer(sql(s"SELECT * FROM delta.`$tablePath`"), Seq(Row(1), Row(2), Row(3), Row(4)))
}
}

testWithDifferentBackfillInterval("ensure backfills commit files works as expected") { _ =>
withTempDir { tempDir =>
val tablePath = tempDir.getAbsolutePath

// Add 10 commits to the table
Seq(1).toDF().write.format("delta").mode("overwrite").save(tablePath)
2 to 10 foreach { i =>
Seq(i).toDF().write.format("delta").mode("append").save(tablePath)
}
val log = DeltaLog.forTable(spark, tablePath)
val snapshot = log.update()
snapshot.ensureCommitFilesBackfilled()

val commitFiles = log.listFrom(0).filter(FileNames.isDeltaFile).map(_.getPath)
val backfilledCommitFiles = (0 to 9).map(version => FileNames.deltaFile(log.logPath, version))
assert(commitFiles.toSeq == backfilledCommitFiles)
}
}
}
Loading

0 comments on commit fc9821c

Please sign in to comment.