From fc9821cbc3a2146d5aabd8cb81d70bb3c89868f9 Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Mon, 1 Apr 2024 16:34:13 -0700 Subject: [PATCH] Fix bug --- .../apache/spark/sql/delta/Checkpoints.scala | 19 +++++++++-- .../sql/delta/OptimisticTransaction.scala | 7 ++++ .../org/apache/spark/sql/delta/Snapshot.scala | 32 +++++++++++++++++++ .../AbstractBatchBackfillingCommitStore.scala | 13 +++++--- .../sql/delta/managedcommit/CommitStore.scala | 29 +++++++++++------ .../delta/util/DeltaCommitFileProvider.scala | 26 +++++++++++++-- .../spark/sql/delta/CheckpointsSuite.scala | 27 ++++++++++++++-- .../delta/DeltaLogMinorCompactionSuite.scala | 28 ++++++++++++++-- .../delta/OptimisticTransactionSuite.scala | 12 +++++++ .../managedcommit/CommitStoreSuite.scala | 7 ++++ .../managedcommit/ManagedCommitSuite.scala | 19 +++++++++++ .../ManagedCommitTestUtils.scala | 9 ++++++ 12 files changed, 204 insertions(+), 24 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 5f5c7bd9f97..73be0288c7b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.DeltaFileOperations +import org.apache.spark.sql.delta.util.FileNames import org.apache.spark.sql.delta.util.FileNames._ import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.conf.Configuration @@ -344,8 +345,22 @@ trait Checkpoints extends DeltaLogging { } } - protected def writeCheckpointFiles( - snapshotToCheckpoint: Snapshot): LastCheckpointInfo = { + protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): LastCheckpointInfo = { + // With Managed-Commits, commit files are not guaranteed to be backfilled immediately in the + // _delta_log dir. While it is possible to compute a checkpoint file without backfilling, + // writing the checkpoint file in the log directory before backfilling the relevant commits + // will leave gaps in the dir structure. This can cause issues for readers that are not + // communicating with the CommitStore. + // + // Sample directory structure with a gap if we don't backfill commit files: + // _delta_log/ + // _commits/ + // 00017.$uuid.json + // 00018.$uuid.json + // 00015.json + // 00016.json + // 00018.checkpoint.parquet + snapshotToCheckpoint.ensureCommitFilesBackfilled() Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index b9bae649dcc..b2c6fb621eb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1918,6 +1918,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite override def getCommits( logPath: Path, startVersion: Long, endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) + + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = {} } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index e863cbd4bfe..f92666223a2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -467,6 +467,38 @@ class Snapshot( def redactedPath: String = Utils.redact(spark.sessionState.conf.stringRedactionPattern, path.toUri.toString) + /** + * Ensures that commit files are backfilled up to the current version in the snapshot. + * + * This method checks if there are any un-backfilled versions up to the current version and + * triggers the backfilling process using the commit store. It verifies that the delta file for + * the current version exists after the backfilling process. + * + * @throws IllegalStateException + * if the delta file for the current version is not found after backfilling. + */ + def ensureCommitFilesBackfilled(): Unit = { + val commitStore = commitStoreOpt.getOrElse { + return + } + val minUnbackfilledVersion = DeltaCommitFileProvider(this).minUnbackfilledVersion + if (minUnbackfilledVersion <= version) { + val hadoopConf = deltaLog.newDeltaHadoopConf() + commitStore.backfillToVersion( + deltaLog.store, + hadoopConf, + deltaLog.logPath, + startVersion = minUnbackfilledVersion, + endVersion = Some(version)) + val fs = deltaLog.logPath.getFileSystem(hadoopConf) + val expectedBackfilledDeltaFile = FileNames.deltaFile(deltaLog.logPath, version) + if (!fs.exists(expectedBackfilledDeltaFile)) { + throw new IllegalStateException("Backfilling of commit files failed. " + + s"Expected delta file $expectedBackfilledDeltaFile not found.") + } + } + } + protected def emptyDF: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], logSchema) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala index 2f0cf9c51c7..a995123f411 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala @@ -107,13 +107,16 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { protected def generateUUID(): String = UUID.randomUUID().toString - /** Backfills all un-backfilled commits */ - protected def backfillToVersion( + override def backfillToVersion( logStore: LogStore, hadoopConf: Configuration, - logPath: Path): Unit = { - getCommits(logPath, startVersion = 0).commits.foreach { commit => - backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus) + logPath: Path, + startVersion: Long = 0, + endVersionOpt: Option[Long] = None): Unit = { + getCommits(logPath, startVersion, endVersionOpt) + .commits + .foreach { commit => + backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala index 93d30428048..cee421eb0da 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala @@ -18,13 +18,11 @@ package org.apache.spark.sql.delta.managedcommit import scala.collection.mutable -import org.apache.spark.sql.delta.{DeltaConfigs, InitialSnapshot, ManagedCommitTableFeature, SerializableFileStatus, SnapshotDescriptor} -import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol} +import org.apache.spark.sql.delta.{DeltaConfigs, ManagedCommitTableFeature, SnapshotDescriptor} +import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol} import org.apache.spark.sql.delta.storage.LogStore import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} - -import org.apache.spark.internal.Logging +import org.apache.hadoop.fs.{FileStatus, Path} /** Representation of a commit file */ case class Commit( @@ -92,9 +90,22 @@ trait CommitStore { * tracked by [[CommitStore]]. */ def getCommits( - logPath: Path, - startVersion: Long, - endVersion: Option[Long] = None): GetCommitsResponse + logPath: Path, + startVersion: Long, + endVersion: Option[Long] = None): GetCommitsResponse + + /** + * API to ask the Commit-Owner to backfill all commits >= 'startVersion' and <= `endVersion`. + * + * If this API returns successfully, that means the backfill must have been completed, although + * the Commit-Owner may not be aware of it yet. + */ + def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit } /** A builder interface for CommitStore */ @@ -146,7 +157,7 @@ object CommitStoreProvider { nameToBuilderMapping.retain((k, _) => initialCommitStoreBuilderNames.contains(k)) } - val initialCommitStoreBuilders = Seq[CommitStoreBuilder]( + private val initialCommitStoreBuilders = Seq[CommitStoreBuilder]( // Any new commit-store builder will be registered here. ) initialCommitStoreBuilders.foreach(registerBuilder) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala index 11a373f9938..4fc9e6ea9c9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala @@ -20,13 +20,35 @@ import org.apache.spark.sql.delta.Snapshot import org.apache.spark.sql.delta.util.FileNames._ import org.apache.hadoop.fs.Path -case class DeltaCommitFileProvider(logPath: String, maxVersion: Long, uuids: Map[Long, String]) { +/** + * Provides access to resolve Delta commit files names based on the commit-version. + * + * This class is part of the changes introduced to accommodate the adoption of managed-commits in + * Delta Lake. Previously, certain code paths assumed the existence of delta files for a specific + * version at a predictable path `_delta_log/$version.json`. With managed-commits, delta files may + * alternatively be located at `_delta_log/_commits/$version.$uuid.json`. DeltaCommitFileProvider + * attempts to locate the correct delta files from the Snapshot's LogSegment. + * + * @param logPath The path to the Delta table log directory. + * @param maxVersionInclusive The maximum version of the Delta table (inclusive). + * @param uuids A map of version numbers to their corresponding UUIDs. + */ +case class DeltaCommitFileProvider( + logPath: String, + maxVersionInclusive: Long, + uuids: Map[Long, String]) { // Ensure the Path object is reused across Delta Files but not stored as part of the object state // since it is not serializable. @transient lazy val resolvedPath: Path = new Path(logPath) + lazy val minUnbackfilledVersion: Long = + if (uuids.keys.isEmpty) { + maxVersionInclusive + 1 + } else { + uuids.keys.min + } def deltaFile(version: Long): Path = { - if (version > maxVersion) { + if (version > maxVersionInclusive) { throw new IllegalStateException("Cannot resolve Delta table at version $version as the " + "state is currently at version $maxVersion. The requested version may be incorrect or " + "the state may be outdated. Please verify the requested version, update the state if " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 4c0b02404d6..52ca5f7c9c9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -25,10 +25,12 @@ import scala.concurrent.duration._ import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord} import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite +import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LocalLogStore import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.util.DeltaCommitFileProvider import org.apache.spark.sql.delta.util.FileNames import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration @@ -45,7 +47,8 @@ class CheckpointsSuite extends QueryTest with SharedSparkSession with DeltaCheckpointTestUtils - with DeltaSQLCommandTest { + with DeltaSQLCommandTest + with ManagedCommitBaseSuite { def testDifferentV2Checkpoints(testName: String)(f: => Unit): Unit = { for (checkpointFormat <- Seq(V2Checkpoint.Format.JSON.name, V2Checkpoint.Format.PARQUET.name)) { @@ -404,7 +407,10 @@ class CheckpointsSuite // CDC should exist in the log as seen through getChanges, but it shouldn't be in the // snapshots and the checkpoint file shouldn't have a CDC column. val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile])) + val deltaPath = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot) + .deltaFile(version = 1) + val deltaFileContent = deltaLog.store.read(deltaPath, deltaLog.newDeltaHadoopConf()) + assert(deltaFileContent.map(Action.fromJson).exists(_.isInstanceOf[AddCDCFile])) assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null }) deltaLog.checkpoint() val checkpointFile = FileNames.checkpointFileSingular(deltaLog.logPath, 1) @@ -458,7 +464,10 @@ class CheckpointsSuite // CDC should exist in the log as seen through getChanges, but it shouldn't be in the // snapshots and the checkpoint file shouldn't have a CDC column. val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile])) + val deltaPath = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot) + .deltaFile(version = 1) + val deltaFileContent = deltaLog.store.read(deltaPath, deltaLog.newDeltaHadoopConf()) + assert(deltaFileContent.map(Action.fromJson).exists(_.isInstanceOf[AddCDCFile])) assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null }) deltaLog.checkpoint() var sidecarCheckpointFiles = getV2CheckpointProvider(deltaLog).sidecarFileStatuses @@ -977,3 +986,15 @@ class FakeGCSFileSystem extends RawLocalFileSystem { } } +class ManagedCommitBatch1BackFillCheckpointsSuite extends CheckpointsSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(1) +} + +class ManagedCommitBatch2BackFillCheckpointsSuite extends CheckpointsSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(2) +} + +class ManagedCommitBatch20BackFillCheckpointsSuite extends CheckpointsSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(20) +} + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala index 548905b1ff5..5a341445ceb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate import org.apache.spark.sql.delta.actions._ +import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils -import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames} import org.apache.hadoop.fs.Path import org.apache.spark.sql._ @@ -32,7 +33,8 @@ import org.apache.spark.sql.test.SharedSparkSession class DeltaLogMinorCompactionSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest - with DeltaSQLTestUtils { + with DeltaSQLTestUtils + with ManagedCommitBaseSuite { /** Helper method to do minor compaction of [[DeltaLog]] from [startVersion, endVersion] */ private def minorCompactDeltaLog( @@ -40,6 +42,14 @@ class DeltaLogMinorCompactionSuite extends QueryTest startVersion: Long, endVersion: Long): Unit = { val deltaLog = DeltaLog.forTable(spark, tablePath) + deltaLog.update().commitStoreOpt.foreach { commitStore => + commitStore.backfillToVersion( + deltaLog.store, + deltaLog.newDeltaHadoopConf(), + deltaLog.logPath, + startVersion = 0, + Some(endVersion)) + } val logReplay = new InMemoryLogReplay( minFileRetentionTimestamp = 0, minSetTransactionRetentionTimestamp = None) @@ -65,7 +75,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest numRemoves: Int = 0, numMetadata: Int = 0): Unit = { assert(log.update().version === version) - val filePath = FileNames.deltaFile(log.logPath, version) + val filePath = DeltaCommitFileProvider(log.update()).deltaFile(version) val actions = log.store.read(filePath, log.newDeltaHadoopConf()).map(Action.fromJson) assert(actions.head.isInstanceOf[CommitInfo]) assert(actions.tail.count(_.isInstanceOf[AddFile]) === numAdds) @@ -435,3 +445,15 @@ class DeltaLogMinorCompactionSuite extends QueryTest } } +class ManagedCommitBatchBackfill1DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(1) +} + +class ManagedCommitBatchBackFill2DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(2) +} + +class ManagedCommitBatchBackFill20DeltaLogMinorCompactionSuite + extends DeltaLogMinorCompactionSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(20) +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index 4ad0c521b29..8e4c8910f6c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -478,6 +478,12 @@ class OptimisticTransactionSuite tablePath: Path, startVersion: Long, endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = {} } } } @@ -524,6 +530,12 @@ class OptimisticTransactionSuite tablePath: Path, startVersion: Long, endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = {} } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala index e5b8aba6b97..37ae4a75e0d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala @@ -45,6 +45,13 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark logPath: Path, startVersion: Long, endVersion: Option[Long] = None): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) + + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = {} } class TestCommitStore1 extends TestCommitStoreBase diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala index b9ecf052011..65134a41b0c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala @@ -510,4 +510,23 @@ class ManagedCommitSuite checkAnswer(sql(s"SELECT * FROM delta.`$tablePath`"), Seq(Row(1), Row(2), Row(3), Row(4))) } } + + testWithDifferentBackfillInterval("ensure backfills commit files works as expected") { _ => + withTempDir { tempDir => + val tablePath = tempDir.getAbsolutePath + + // Add 10 commits to the table + Seq(1).toDF().write.format("delta").mode("overwrite").save(tablePath) + 2 to 10 foreach { i => + Seq(i).toDF().write.format("delta").mode("append").save(tablePath) + } + val log = DeltaLog.forTable(spark, tablePath) + val snapshot = log.update() + snapshot.ensureCommitFilesBackfilled() + + val commitFiles = log.listFrom(0).filter(FileNames.isDeltaFile).map(_.getPath) + val backfilledCommitFiles = (0 to 9).map(version => FileNames.deltaFile(log.logPath, version)) + assert(commitFiles.toSeq == backfilledCommitFiles) + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala index b99d2cdc3ca..c109b17b449 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala @@ -121,6 +121,15 @@ class TrackingCommitStore(delegatingCommitStore: InMemoryCommitStore) extends Co delegatingCommitStore.getCommits(logPath, startVersion, endVersion) } + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = { + delegatingCommitStore.backfillToVersion(logStore, hadoopConf, logPath, startVersion, endVersion) + } + def registerTable( logPath: Path, maxCommitVersion: Long): Unit = {