From c88db07a60e7ed6987dc9c956989ceaaedfe8458 Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Tue, 2 Apr 2024 11:51:52 -0700 Subject: [PATCH] [Spark] Backfill commit files before checkpointing or minor compaction in managed-commits (#2823) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description With managed-commit, commit files are not guaranteed to be present in the _delta_log directory at the time of checkpointing or minor compactions. While it is possible to compute a checkpoint file without backfilling, writing the checkpoint file in the log directory before backfilling the relevant commits will leave gaps in the directory structure. This can cause issues for readers that are not communicating with the CommitStore. To address this problem, we now backfill commit files up to the committedVersion before performing a checkpoint or minor compaction operation ## How was this patch tested? UTs ## Does this PR introduce _any_ user-facing changes? No --- .../apache/spark/sql/delta/Checkpoints.scala | 19 +++++++++-- .../sql/delta/OptimisticTransaction.scala | 7 ++++ .../org/apache/spark/sql/delta/Snapshot.scala | 32 +++++++++++++++++++ .../AbstractBatchBackfillingCommitStore.scala | 13 +++++--- .../sql/delta/managedcommit/CommitStore.scala | 29 +++++++++++------ .../delta/util/DeltaCommitFileProvider.scala | 26 +++++++++++++-- .../spark/sql/delta/CheckpointsSuite.scala | 27 ++++++++++++++-- .../delta/DeltaLogMinorCompactionSuite.scala | 28 ++++++++++++++-- .../delta/OptimisticTransactionSuite.scala | 12 +++++++ .../managedcommit/CommitStoreSuite.scala | 7 ++++ .../managedcommit/ManagedCommitSuite.scala | 19 +++++++++++ .../ManagedCommitTestUtils.scala | 9 ++++++ 12 files changed, 204 insertions(+), 24 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 5f5c7bd9f97..73be0288c7b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.DeltaFileOperations +import org.apache.spark.sql.delta.util.FileNames import org.apache.spark.sql.delta.util.FileNames._ import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.conf.Configuration @@ -344,8 +345,22 @@ trait Checkpoints extends DeltaLogging { } } - protected def writeCheckpointFiles( - snapshotToCheckpoint: Snapshot): LastCheckpointInfo = { + protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): LastCheckpointInfo = { + // With Managed-Commits, commit files are not guaranteed to be backfilled immediately in the + // _delta_log dir. While it is possible to compute a checkpoint file without backfilling, + // writing the checkpoint file in the log directory before backfilling the relevant commits + // will leave gaps in the dir structure. This can cause issues for readers that are not + // communicating with the CommitStore. + // + // Sample directory structure with a gap if we don't backfill commit files: + // _delta_log/ + // _commits/ + // 00017.$uuid.json + // 00018.$uuid.json + // 00015.json + // 00016.json + // 00018.checkpoint.parquet + snapshotToCheckpoint.ensureCommitFilesBackfilled() Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 0126f817e77..7211af71e34 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1929,6 +1929,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite override def getCommits( logPath: Path, startVersion: Long, endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) + + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = {} } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index e863cbd4bfe..f92666223a2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -467,6 +467,38 @@ class Snapshot( def redactedPath: String = Utils.redact(spark.sessionState.conf.stringRedactionPattern, path.toUri.toString) + /** + * Ensures that commit files are backfilled up to the current version in the snapshot. + * + * This method checks if there are any un-backfilled versions up to the current version and + * triggers the backfilling process using the commit store. It verifies that the delta file for + * the current version exists after the backfilling process. + * + * @throws IllegalStateException + * if the delta file for the current version is not found after backfilling. + */ + def ensureCommitFilesBackfilled(): Unit = { + val commitStore = commitStoreOpt.getOrElse { + return + } + val minUnbackfilledVersion = DeltaCommitFileProvider(this).minUnbackfilledVersion + if (minUnbackfilledVersion <= version) { + val hadoopConf = deltaLog.newDeltaHadoopConf() + commitStore.backfillToVersion( + deltaLog.store, + hadoopConf, + deltaLog.logPath, + startVersion = minUnbackfilledVersion, + endVersion = Some(version)) + val fs = deltaLog.logPath.getFileSystem(hadoopConf) + val expectedBackfilledDeltaFile = FileNames.deltaFile(deltaLog.logPath, version) + if (!fs.exists(expectedBackfilledDeltaFile)) { + throw new IllegalStateException("Backfilling of commit files failed. " + + s"Expected delta file $expectedBackfilledDeltaFile not found.") + } + } + } + protected def emptyDF: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], logSchema) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala index 2f0cf9c51c7..a995123f411 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala @@ -107,13 +107,16 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { protected def generateUUID(): String = UUID.randomUUID().toString - /** Backfills all un-backfilled commits */ - protected def backfillToVersion( + override def backfillToVersion( logStore: LogStore, hadoopConf: Configuration, - logPath: Path): Unit = { - getCommits(logPath, startVersion = 0).commits.foreach { commit => - backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus) + logPath: Path, + startVersion: Long = 0, + endVersionOpt: Option[Long] = None): Unit = { + getCommits(logPath, startVersion, endVersionOpt) + .commits + .foreach { commit => + backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala index 287c48f6a26..41920688bf0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala @@ -18,13 +18,11 @@ package org.apache.spark.sql.delta.managedcommit import scala.collection.mutable -import org.apache.spark.sql.delta.{DeltaConfigs, InitialSnapshot, ManagedCommitTableFeature, SerializableFileStatus, SnapshotDescriptor} -import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol} +import org.apache.spark.sql.delta.{DeltaConfigs, ManagedCommitTableFeature, SnapshotDescriptor} +import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol} import org.apache.spark.sql.delta.storage.LogStore import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} - -import org.apache.spark.internal.Logging +import org.apache.hadoop.fs.{FileStatus, Path} /** Representation of a commit file */ case class Commit( @@ -92,9 +90,22 @@ trait CommitStore { * tracked by [[CommitStore]]. */ def getCommits( - logPath: Path, - startVersion: Long, - endVersion: Option[Long] = None): GetCommitsResponse + logPath: Path, + startVersion: Long, + endVersion: Option[Long] = None): GetCommitsResponse + + /** + * API to ask the Commit-Owner to backfill all commits >= 'startVersion' and <= `endVersion`. + * + * If this API returns successfully, that means the backfill must have been completed, although + * the Commit-Owner may not be aware of it yet. + */ + def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit } /** A builder interface for CommitStore */ @@ -146,7 +157,7 @@ object CommitStoreProvider { nameToBuilderMapping.retain((k, _) => initialCommitStoreBuilderNames.contains(k)) } - val initialCommitStoreBuilders = Seq[CommitStoreBuilder]( + private val initialCommitStoreBuilders = Seq[CommitStoreBuilder]( // Any new commit-store builder will be registered here. ) initialCommitStoreBuilders.foreach(registerBuilder) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala index 11a373f9938..4fc9e6ea9c9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala @@ -20,13 +20,35 @@ import org.apache.spark.sql.delta.Snapshot import org.apache.spark.sql.delta.util.FileNames._ import org.apache.hadoop.fs.Path -case class DeltaCommitFileProvider(logPath: String, maxVersion: Long, uuids: Map[Long, String]) { +/** + * Provides access to resolve Delta commit files names based on the commit-version. + * + * This class is part of the changes introduced to accommodate the adoption of managed-commits in + * Delta Lake. Previously, certain code paths assumed the existence of delta files for a specific + * version at a predictable path `_delta_log/$version.json`. With managed-commits, delta files may + * alternatively be located at `_delta_log/_commits/$version.$uuid.json`. DeltaCommitFileProvider + * attempts to locate the correct delta files from the Snapshot's LogSegment. + * + * @param logPath The path to the Delta table log directory. + * @param maxVersionInclusive The maximum version of the Delta table (inclusive). + * @param uuids A map of version numbers to their corresponding UUIDs. + */ +case class DeltaCommitFileProvider( + logPath: String, + maxVersionInclusive: Long, + uuids: Map[Long, String]) { // Ensure the Path object is reused across Delta Files but not stored as part of the object state // since it is not serializable. @transient lazy val resolvedPath: Path = new Path(logPath) + lazy val minUnbackfilledVersion: Long = + if (uuids.keys.isEmpty) { + maxVersionInclusive + 1 + } else { + uuids.keys.min + } def deltaFile(version: Long): Path = { - if (version > maxVersion) { + if (version > maxVersionInclusive) { throw new IllegalStateException("Cannot resolve Delta table at version $version as the " + "state is currently at version $maxVersion. The requested version may be incorrect or " + "the state may be outdated. Please verify the requested version, update the state if " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 4c0b02404d6..52ca5f7c9c9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -25,10 +25,12 @@ import scala.concurrent.duration._ import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord} import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite +import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LocalLogStore import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.util.DeltaCommitFileProvider import org.apache.spark.sql.delta.util.FileNames import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration @@ -45,7 +47,8 @@ class CheckpointsSuite extends QueryTest with SharedSparkSession with DeltaCheckpointTestUtils - with DeltaSQLCommandTest { + with DeltaSQLCommandTest + with ManagedCommitBaseSuite { def testDifferentV2Checkpoints(testName: String)(f: => Unit): Unit = { for (checkpointFormat <- Seq(V2Checkpoint.Format.JSON.name, V2Checkpoint.Format.PARQUET.name)) { @@ -404,7 +407,10 @@ class CheckpointsSuite // CDC should exist in the log as seen through getChanges, but it shouldn't be in the // snapshots and the checkpoint file shouldn't have a CDC column. val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile])) + val deltaPath = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot) + .deltaFile(version = 1) + val deltaFileContent = deltaLog.store.read(deltaPath, deltaLog.newDeltaHadoopConf()) + assert(deltaFileContent.map(Action.fromJson).exists(_.isInstanceOf[AddCDCFile])) assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null }) deltaLog.checkpoint() val checkpointFile = FileNames.checkpointFileSingular(deltaLog.logPath, 1) @@ -458,7 +464,10 @@ class CheckpointsSuite // CDC should exist in the log as seen through getChanges, but it shouldn't be in the // snapshots and the checkpoint file shouldn't have a CDC column. val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - assert(deltaLog.getChanges(1).next()._2.exists(_.isInstanceOf[AddCDCFile])) + val deltaPath = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot) + .deltaFile(version = 1) + val deltaFileContent = deltaLog.store.read(deltaPath, deltaLog.newDeltaHadoopConf()) + assert(deltaFileContent.map(Action.fromJson).exists(_.isInstanceOf[AddCDCFile])) assert(deltaLog.snapshot.stateDS.collect().forall { sa => sa.cdc == null }) deltaLog.checkpoint() var sidecarCheckpointFiles = getV2CheckpointProvider(deltaLog).sidecarFileStatuses @@ -977,3 +986,15 @@ class FakeGCSFileSystem extends RawLocalFileSystem { } } +class ManagedCommitBatch1BackFillCheckpointsSuite extends CheckpointsSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(1) +} + +class ManagedCommitBatch2BackFillCheckpointsSuite extends CheckpointsSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(2) +} + +class ManagedCommitBatch20BackFillCheckpointsSuite extends CheckpointsSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(20) +} + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala index 548905b1ff5..5a341445ceb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate import org.apache.spark.sql.delta.actions._ +import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils -import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames} import org.apache.hadoop.fs.Path import org.apache.spark.sql._ @@ -32,7 +33,8 @@ import org.apache.spark.sql.test.SharedSparkSession class DeltaLogMinorCompactionSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest - with DeltaSQLTestUtils { + with DeltaSQLTestUtils + with ManagedCommitBaseSuite { /** Helper method to do minor compaction of [[DeltaLog]] from [startVersion, endVersion] */ private def minorCompactDeltaLog( @@ -40,6 +42,14 @@ class DeltaLogMinorCompactionSuite extends QueryTest startVersion: Long, endVersion: Long): Unit = { val deltaLog = DeltaLog.forTable(spark, tablePath) + deltaLog.update().commitStoreOpt.foreach { commitStore => + commitStore.backfillToVersion( + deltaLog.store, + deltaLog.newDeltaHadoopConf(), + deltaLog.logPath, + startVersion = 0, + Some(endVersion)) + } val logReplay = new InMemoryLogReplay( minFileRetentionTimestamp = 0, minSetTransactionRetentionTimestamp = None) @@ -65,7 +75,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest numRemoves: Int = 0, numMetadata: Int = 0): Unit = { assert(log.update().version === version) - val filePath = FileNames.deltaFile(log.logPath, version) + val filePath = DeltaCommitFileProvider(log.update()).deltaFile(version) val actions = log.store.read(filePath, log.newDeltaHadoopConf()).map(Action.fromJson) assert(actions.head.isInstanceOf[CommitInfo]) assert(actions.tail.count(_.isInstanceOf[AddFile]) === numAdds) @@ -435,3 +445,15 @@ class DeltaLogMinorCompactionSuite extends QueryTest } } +class ManagedCommitBatchBackfill1DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(1) +} + +class ManagedCommitBatchBackFill2DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(2) +} + +class ManagedCommitBatchBackFill20DeltaLogMinorCompactionSuite + extends DeltaLogMinorCompactionSuite { + override val managedCommitBackfillBatchSize: Option[Int] = Some(20) +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index 91ddac80323..da8afa1178e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -485,6 +485,12 @@ class OptimisticTransactionSuite tablePath: Path, startVersion: Long, endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = {} } } } @@ -531,6 +537,12 @@ class OptimisticTransactionSuite tablePath: Path, startVersion: Long, endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = {} } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala index e5b8aba6b97..37ae4a75e0d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala @@ -45,6 +45,13 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark logPath: Path, startVersion: Long, endVersion: Option[Long] = None): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) + + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = {} } class TestCommitStore1 extends TestCommitStoreBase diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala index b9ecf052011..65134a41b0c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala @@ -510,4 +510,23 @@ class ManagedCommitSuite checkAnswer(sql(s"SELECT * FROM delta.`$tablePath`"), Seq(Row(1), Row(2), Row(3), Row(4))) } } + + testWithDifferentBackfillInterval("ensure backfills commit files works as expected") { _ => + withTempDir { tempDir => + val tablePath = tempDir.getAbsolutePath + + // Add 10 commits to the table + Seq(1).toDF().write.format("delta").mode("overwrite").save(tablePath) + 2 to 10 foreach { i => + Seq(i).toDF().write.format("delta").mode("append").save(tablePath) + } + val log = DeltaLog.forTable(spark, tablePath) + val snapshot = log.update() + snapshot.ensureCommitFilesBackfilled() + + val commitFiles = log.listFrom(0).filter(FileNames.isDeltaFile).map(_.getPath) + val backfilledCommitFiles = (0 to 9).map(version => FileNames.deltaFile(log.logPath, version)) + assert(commitFiles.toSeq == backfilledCommitFiles) + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala index b99d2cdc3ca..c109b17b449 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala @@ -121,6 +121,15 @@ class TrackingCommitStore(delegatingCommitStore: InMemoryCommitStore) extends Co delegatingCommitStore.getCommits(logPath, startVersion, endVersion) } + override def backfillToVersion( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): Unit = { + delegatingCommitStore.backfillToVersion(logStore, hadoopConf, logPath, startVersion, endVersion) + } + def registerTable( logPath: Path, maxCommitVersion: Long): Unit = {