From 91b74767511e97e75788c00e90a464261c87e1cb Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Thu, 2 May 2024 19:11:35 -0700 Subject: [PATCH] Parallel calls --- .../sql/delta/OptimisticTransaction.scala | 6 +- .../org/apache/spark/sql/delta/Snapshot.scala | 3 +- .../spark/sql/delta/SnapshotManagement.scala | 2 +- ...actBatchBackfillingCommitOwnerClient.scala | 28 ++++++++-- .../managedcommit/CommitOwnerClient.scala | 14 +++-- .../managedcommit/InMemoryCommitOwner.scala | 12 ++-- .../TableCommitOwnerClient.scala | 8 +-- .../delta/DeltaLogMinorCompactionSuite.scala | 2 +- .../sql/delta/InCommitTimestampSuite.scala | 2 +- .../sql/delta/SnapshotManagementSuite.scala | 2 +- .../CommitOwnerClientSuite.scala | 12 ++-- .../InMemoryCommitOwnerSuite.scala | 24 ++++---- .../managedcommit/ManagedCommitSuite.scala | 55 ++++++++++--------- .../ManagedCommitTestUtils.scala | 8 +-- 14 files changed, 104 insertions(+), 74 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 585227f9209..df942fdfced 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -2109,7 +2109,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) @@ -2118,8 +2118,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit = {} + version: Long, + lastKnownBackfilledVersion: Option[Long] = None): Unit = {} /** * [[FileSystemBasedCommitOwnerClient]] is supposed to be treated as a singleton object for a diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 22bbba660fc..08f296752eb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -488,7 +488,8 @@ class Snapshot( if (minUnbackfilledVersion <= version) { val hadoopConf = deltaLog.newDeltaHadoopConf() tableCommitOwnerClient.backfillToVersion( - startVersion = minUnbackfilledVersion, endVersion = Some(version)) + version, + lastKnownBackfilledVersion = Some(minUnbackfilledVersion - 1)) val fs = deltaLog.logPath.getFileSystem(hadoopConf) val expectedBackfilledDeltaFile = FileNames.unsafeDeltaFile(deltaLog.logPath, version) if (!fs.exists(expectedBackfilledDeltaFile)) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 25e9be5668d..5eb347e0b11 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -163,7 +163,7 @@ trait SnapshotManagement { self: DeltaLog => val threadPool = SnapshotManagement.commitOwnerGetCommitsThreadPool def getCommitsTask(async: Boolean): GetCommitsResponse = { recordFrameProfile("DeltaLog", s"CommitOwnerClient.getCommits.async=$async") { - tableCommitOwnerClient.getCommits(startVersion, endVersion = versionToLoad) + tableCommitOwnerClient.getCommits(Some(startVersion), endVersion = versionToLoad) } } val unbackfilledCommitsResponseFuture = diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala index 75932fa71e9..48b313347cf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala @@ -71,9 +71,14 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L val fs = logPath.getFileSystem(hadoopConf) if (batchSize <= 1) { // Backfill until `commitVersion - 1` - logInfo(s"Making sure commits are backfilled until $commitVersion version for" + + logInfo(s"Making sure commits are backfilled until ${commitVersion - 1} version for" + s" table ${tablePath.toString}") - backfillToVersion(logStore, hadoopConf, logPath, managedCommitTableConf) + backfillToVersion( + logStore, + hadoopConf, + logPath, + managedCommitTableConf, + commitVersion - 1) } // Write new commit file in _commits directory @@ -104,7 +109,12 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L } else if (commitVersion % batchSize == 0 || mcToFsConversion) { logInfo(s"Making sure commits are backfilled till $commitVersion version for" + s"table ${tablePath.toString}") - backfillToVersion(logStore, hadoopConf, logPath, managedCommitTableConf) + backfillToVersion( + logStore, + hadoopConf, + logPath, + managedCommitTableConf, + commitVersion) } logInfo(s"Commit $commitVersion done successfully on table $tablePath") commitResponse @@ -127,9 +137,15 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long = 0, - endVersionOpt: Option[Long] = None): Unit = { - getCommits(logPath, managedCommitTableConf, startVersion, endVersionOpt) + version: Long, + lastKnownBackfilledVersionOpt: Option[Long] = None): Unit = { + // Confirm the last backfilled version by checking the backfilled delta file's existence. + val validLastKnownBackfilledVersionOpt = lastKnownBackfilledVersionOpt.filter { version => + val fs = logPath.getFileSystem(hadoopConf) + fs.exists(FileNames.unsafeDeltaFile(logPath, version)) + } + val startVersionOpt = validLastKnownBackfilledVersionOpt.map(_ + 1) + getCommits(logPath, managedCommitTableConf, startVersionOpt, Some(version)) .commits .foreach { commit => backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala index 9868a2d71ea..f2eb0f37820 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala @@ -119,22 +119,28 @@ trait CommitOwnerClient { def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long] = None, endVersion: Option[Long] = None): GetCommitsResponse /** - * API to ask the Commit-Owner to backfill all commits >= 'startVersion' and <= `endVersion`. + * API to ask the Commit-Owner to backfill all commits > `lastKnownBackfilledVersion` and + * <= `endVersion`. * * If this API returns successfully, that means the backfill must have been completed, although * the Commit-Owner may not be aware of it yet. + * + * @param version The version till which the Commit-Owner should backfill. + * @param lastKnownBackfilledVersion The last known version that was backfilled by Commit-Owner + * before this API was called. If it's None or invalid, then the + * Commit-Owner should backfill from the beginning of the table. */ def backfillToVersion( logStore: LogStore, hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit + version: Long, + lastKnownBackfilledVersion: Option[Long]): Unit /** * Determines whether this [[CommitOwnerClient]] is semantically equal to another diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwner.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwner.scala index c493ab02fed..99427462d2b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwner.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwner.scala @@ -139,19 +139,21 @@ class InMemoryCommitOwner(val batchSize: Long) override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long]): GetCommitsResponse = { withReadLock[GetCommitsResponse](logPath) { val tableData = perTableMap.get(logPath) + val effectiveStartVersion = startVersion.getOrElse(0L) // Calculate the end version for the range, or use the last key if endVersion is not provided - val effectiveEndVersion = - endVersion.getOrElse(tableData.commitsMap.lastOption.map(_._1).getOrElse(startVersion)) - val commitsInRange = tableData.commitsMap.range(startVersion, effectiveEndVersion + 1) + val effectiveEndVersion = endVersion.getOrElse( + tableData.commitsMap.lastOption.map(_._1).getOrElse(effectiveStartVersion)) + val commitsInRange = tableData.commitsMap.range( + effectiveStartVersion, effectiveEndVersion + 1) GetCommitsResponse(commitsInRange.values.toSeq, tableData.lastRatifiedCommitVersion) } } - override protected[delta] def registerBackfill( + override protected[sql] def registerBackfill( logPath: Path, backfilledVersion: Long): Unit = { withWriteLock(logPath) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/TableCommitOwnerClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/TableCommitOwnerClient.scala index b48782a318f..e3e4ddf4330 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/TableCommitOwnerClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/TableCommitOwnerClient.scala @@ -49,16 +49,16 @@ case class TableCommitOwnerClient( } def getCommits( - startVersion: Long, + startVersion: Option[Long] = None, endVersion: Option[Long] = None): GetCommitsResponse = { commitOwnerClient.getCommits(logPath, tableConf, startVersion, endVersion) } def backfillToVersion( - startVersion: Long, - endVersion: Option[Long]): Unit = { + version: Long, + lastKnownBackfilledVersion: Option[Long] = None): Unit = { commitOwnerClient.backfillToVersion( - logStore, hadoopConf, logPath, tableConf, startVersion, endVersion) + logStore, hadoopConf, logPath, tableConf, version, lastKnownBackfilledVersion) } /** diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala index 8e31eb282b9..0a028a59af6 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala @@ -43,7 +43,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest endVersion: Long): Unit = { val deltaLog = DeltaLog.forTable(spark, tablePath) deltaLog.update().tableCommitOwnerClientOpt.foreach { tableCommitOwnerClient => - tableCommitOwnerClient.backfillToVersion(startVersion = 0, Some(endVersion)) + tableCommitOwnerClient.backfillToVersion(endVersion) } val logReplay = new InMemoryLogReplay( minFileRetentionTimestamp = 0, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index ab2819dfe11..4a1c0dc06da 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -976,7 +976,7 @@ class InCommitTimestampWithManagedCommitSuite val commitFileProvider = DeltaCommitFileProvider(deltaLog.update()) val unbackfilledCommits = tableCommitOwnerClient - .getCommits(1) + .getCommits(Some(1)) .commits .map { commit => DeltaHistoryManager.Commit(commit.version, commit.commitTimestamp)} val commits = (Seq(commit0) ++ unbackfilledCommits).toList diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index d1f798b107a..2aef02989a2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -510,7 +510,7 @@ case class ConcurrentBackfillCommitOwnerClient( override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long]): GetCommitsResponse = { if (ConcurrentBackfillCommitOwnerClient.beginConcurrentBackfills) { CountDownLatchLogStore.listFromCalled.await() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientSuite.scala index 2751b3be21e..5f4aabb91d5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.test.SharedSparkSession class CommitOwnerClientSuite extends QueryTest with DeltaSQLTestUtils with SharedSparkSession with DeltaSQLCommandTest { - trait TestCommitOwnerClientBase extends CommitOwnerClient { + private trait TestCommitOwnerClientBase extends CommitOwnerClient { override def commit( logStore: LogStore, hadoopConf: Configuration, @@ -47,7 +47,7 @@ class CommitOwnerClientSuite extends QueryTest with DeltaSQLTestUtils with Share override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long] = None): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) override def backfillToVersion( @@ -55,14 +55,14 @@ class CommitOwnerClientSuite extends QueryTest with DeltaSQLTestUtils with Share hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit = {} + version: Long, + lastKnownBackfilledVersion: Option[Long]): Unit = {} override def semanticEquals(other: CommitOwnerClient): Boolean = this == other } - class TestCommitOwnerClient1 extends TestCommitOwnerClientBase - class TestCommitOwnerClient2 extends TestCommitOwnerClientBase + private class TestCommitOwnerClient1 extends TestCommitOwnerClientBase + private class TestCommitOwnerClient2 extends TestCommitOwnerClientBase override def beforeEach(): Unit = { super.beforeEach() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala index f93d3a83878..d19dc1979cd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala @@ -159,7 +159,7 @@ class InMemoryCommitOwnerSuite extends QueryTest val tcs = TableCommitOwnerClient(cs, log, Map.empty[String, String]) cs.registerTable(logPath, currentVersion = -1L, Metadata(), Protocol(1, 1)) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, -1)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, -1)) // Commit 0 must be done by file-system val e = intercept[CommitFailedException] { commit(version = 0, timestamp = 0, tcs) } @@ -167,28 +167,28 @@ class InMemoryCommitOwnerSuite extends QueryTest store.write(FileNames.unsafeDeltaFile(logPath, 0), Iterator("0", "0"), overwrite = false) // Commit 0 doesn't go through commit-owner. So commit-owner is not aware of it in getCommits // response. - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, -1)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, -1)) assertBackfilled(0, logPath, Some(0)) val c1 = commit(1, 1, tcs) val c2 = commit(2, 2, tcs) - assert(tcs.getCommits(0).commits.takeRight(2) == Seq(c1, c2)) + assert(tcs.getCommits().commits.takeRight(2) == Seq(c1, c2)) // All 3 commits are backfilled since batchSize == 3 val c3 = commit(3, 3, tcs) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, 3)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, 3)) (1 to 3).foreach(i => assertBackfilled(i, logPath, Some(i))) // Test that startVersion and endVersion are respected in getCommits val c4 = commit(4, 4, tcs) val c5 = commit(5, 5, tcs) - assert(tcs.getCommits(4) == GetCommitsResponse(Seq(c4, c5), 5)) - assert(tcs.getCommits(4, Some(4)) == GetCommitsResponse(Seq(c4), 5)) - assert(tcs.getCommits(5) == GetCommitsResponse(Seq(c5), 5)) + assert(tcs.getCommits(Some(4)) == GetCommitsResponse(Seq(c4, c5), 5)) + assert(tcs.getCommits(Some(4), Some(4)) == GetCommitsResponse(Seq(c4), 5)) + assert(tcs.getCommits(Some(5)) == GetCommitsResponse(Seq(c5), 5)) // Commit [4, 6] are backfilled since batchSize == 3 val c6 = commit(6, 6, tcs) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, 6)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, 6)) (4 to 6).foreach(i => assertBackfilled(i, logPath, Some(i))) assertInvariants(logPath, tcs.commitOwnerClient.asInstanceOf[InMemoryCommitOwner]) } @@ -205,13 +205,13 @@ class InMemoryCommitOwnerSuite extends QueryTest val e = intercept[CommitFailedException] { commit(version = 0, timestamp = 0, tcs) } assert(e.getMessage === "Commit version 0 must go via filesystem.") store.write(FileNames.unsafeDeltaFile(logPath, 0), Iterator("0", "0"), overwrite = false) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, -1)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, -1)) assertBackfilled(version = 0, logPath, Some(0L)) // Test that all commits are immediately backfilled (1 to 3).foreach { version => commit(version, version, tcs) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, version)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, version)) assertBackfilled(version, logPath, Some(version)) } @@ -248,7 +248,7 @@ class InMemoryCommitOwnerSuite extends QueryTest // Verify that the conflict-checker still works even when everything has been backfilled commit(5, 5, tcs) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, 5)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, 5)) assertCommitFail(5, 6, retryable = true, commit(5, 5, tcs)) assertCommitFail(7, 6, retryable = false, commit(7, 7, tcs)) @@ -306,7 +306,7 @@ class InMemoryCommitOwnerSuite extends QueryTest override def run(): Unit = { var currentWriterCommits = 0 while (currentWriterCommits < numberOfCommitsPerWriter) { - val nextVersion = tcs.getCommits(0).latestTableVersion + 1 + val nextVersion = tcs.getCommits().latestTableVersion + 1 try { val currentTimestamp = runningTimestamp.getAndIncrement() val commitResponse = commit(nextVersion, currentTimestamp, tcs) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala index 0c3f7f7d6bb..c3ae1a440bf 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala @@ -347,7 +347,7 @@ class ManagedCommitSuite override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long]): GetCommitsResponse = { if (failAttempts.contains(numGetCommitsCalled + 1)) { numGetCommitsCalled += 1 @@ -634,46 +634,51 @@ class ManagedCommitSuite val trackingCommitOwnerClient = new TrackingCommitOwnerClient( new InMemoryCommitOwner(batchSize = 10) { override def registerTable( - logPath: Path, - currentVersion: Long, - currentMetadata: AbstractMetadata, - currentProtocol: AbstractProtocol): Map[String, String] = { + logPath: Path, + currentVersion: Long, + currentMetadata: AbstractMetadata, + currentProtocol: AbstractProtocol): Map[String, String] = { super.registerTable(logPath, currentVersion, currentMetadata, currentProtocol) tableConf } override def getCommits( - logPath: Path, - managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): GetCommitsResponse = { + logPath: Path, + managedCommitTableConf: Map[String, String], + startVersion: Option[Long], + endVersion: Option[Long]): GetCommitsResponse = { assert(managedCommitTableConf === tableConf) super.getCommits(logPath, managedCommitTableConf, startVersion, endVersion) } override def commit( - logStore: LogStore, - hadoopConf: Configuration, - logPath: Path, - managedCommitTableConf: Map[String, String], - commitVersion: Long, - actions: Iterator[String], - updatedActions: UpdatedActions): CommitResponse = { + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + managedCommitTableConf: Map[String, String], + commitVersion: Long, + actions: Iterator[String], + updatedActions: UpdatedActions): CommitResponse = { assert(managedCommitTableConf === tableConf) super.commit(logStore, hadoopConf, logPath, managedCommitTableConf, commitVersion, actions, updatedActions) } override def backfillToVersion( - logStore: LogStore, - hadoopConf: Configuration, - logPath: Path, - managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersionOpt: Option[Long]): Unit = { + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + managedCommitTableConf: Map[String, String], + version: Long, + lastKnownBackfilledVersionOpt: Option[Long]): Unit = { assert(managedCommitTableConf === tableConf) super.backfillToVersion( - logStore, hadoopConf, logPath, managedCommitTableConf, startVersion, endVersionOpt) + logStore, + hadoopConf, + logPath, + managedCommitTableConf, + version, + lastKnownBackfilledVersionOpt) } } ) @@ -947,8 +952,8 @@ class ManagedCommitSuite hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit = { } + version: Long, + lastKnownBackfilledVersionOpt: Option[Long]): Unit = { } }) CommitOwnerProvider.clearNonDefaultBuilders() val builder = diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala index 89a7faa482b..d433a851b55 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala @@ -171,7 +171,7 @@ class TrackingCommitOwnerClient(delegatingCommitOwnerClient: InMemoryCommitOwner override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long] = None): GetCommitsResponse = recordOperation("getCommits") { delegatingCommitOwnerClient.getCommits( logPath, managedCommitTableConf, startVersion, endVersion) @@ -182,10 +182,10 @@ class TrackingCommitOwnerClient(delegatingCommitOwnerClient: InMemoryCommitOwner hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit = recordOperation("backfillToVersion") { + version: Long, + lastKnownBackfilledVersion: Option[Long]): Unit = recordOperation("backfillToVersion") { delegatingCommitOwnerClient.backfillToVersion( - logStore, hadoopConf, logPath, managedCommitTableConf, startVersion, endVersion) + logStore, hadoopConf, logPath, managedCommitTableConf, version, lastKnownBackfilledVersion) } override def semanticEquals(other: CommitOwnerClient): Boolean = this == other