diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 1f581cc96f7..e59c61f350c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -2106,7 +2106,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long]): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) @@ -2115,8 +2115,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit = {} + lastKnownBackfilledVersion: Option[Long], + endVersion: Long): Unit = {} /** * [[FileSystemBasedCommitOwnerClient]] is supposed to be treated as a singleton object for a diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 064f65d5e9b..3804b81e485 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -488,7 +488,8 @@ class Snapshot( if (minUnbackfilledVersion <= version) { val hadoopConf = deltaLog.newDeltaHadoopConf() tableCommitOwnerClient.backfillToVersion( - startVersion = minUnbackfilledVersion, endVersion = Some(version)) + lastKnownBackfilledVersion = Some(minUnbackfilledVersion - 1), + endVersion = version) val fs = deltaLog.logPath.getFileSystem(hadoopConf) val expectedBackfilledDeltaFile = FileNames.unsafeDeltaFile(deltaLog.logPath, version) if (!fs.exists(expectedBackfilledDeltaFile)) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 25e9be5668d..5eb347e0b11 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -163,7 +163,7 @@ trait SnapshotManagement { self: DeltaLog => val threadPool = SnapshotManagement.commitOwnerGetCommitsThreadPool def getCommitsTask(async: Boolean): GetCommitsResponse = { recordFrameProfile("DeltaLog", s"CommitOwnerClient.getCommits.async=$async") { - tableCommitOwnerClient.getCommits(startVersion, endVersion = versionToLoad) + tableCommitOwnerClient.getCommits(Some(startVersion), endVersion = versionToLoad) } } val unbackfilledCommitsResponseFuture = diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala index 74698c32410..da23e858ce1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala @@ -72,7 +72,13 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L // Backfill until `commitVersion - 1` logInfo(s"Making sure commits are backfilled until $commitVersion version for" + s" table ${tablePath.toString}") - backfillToVersion(logStore, hadoopConf, logPath, managedCommitTableConf) + backfillToVersion( + logStore, + hadoopConf, + logPath, + managedCommitTableConf, + lastKnownBackfilledVersionOpt = None, + endVersion = commitVersion - 1) } // Write new commit file in _commits directory @@ -103,7 +109,13 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L } else if (commitVersion % batchSize == 0 || mcToFsConversion) { logInfo(s"Making sure commits are backfilled till $commitVersion version for" + s"table ${tablePath.toString}") - backfillToVersion(logStore, hadoopConf, logPath, managedCommitTableConf) + backfillToVersion( + logStore, + hadoopConf, + logPath, + managedCommitTableConf, + lastKnownBackfilledVersionOpt = None, + endVersion = commitVersion) } logInfo(s"Commit $commitVersion done successfully on table $tablePath") commitResponse @@ -124,9 +136,15 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long = 0, - endVersionOpt: Option[Long] = None): Unit = { - getCommits(logPath, managedCommitTableConf, startVersion, endVersionOpt) + lastKnownBackfilledVersionOpt: Option[Long] = None, + endVersion: Long): Unit = { + // Confirm the last backfilled version by checking the backfilled delta file's existence. + val validLastKnownBackfilledVersionOpt = lastKnownBackfilledVersionOpt.filter { version => + val fs = logPath.getFileSystem(hadoopConf) + fs.exists(FileNames.unsafeDeltaFile(logPath, version)) + } + val startVersionOpt = validLastKnownBackfilledVersionOpt.map(_ + 1) + getCommits(logPath, managedCommitTableConf, startVersionOpt, Some(endVersion)) .commits .foreach { commit => backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala index de5a605daa4..c273636f10c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala @@ -121,22 +121,28 @@ trait CommitOwnerClient { def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long] = None, endVersion: Option[Long] = None): GetCommitsResponse /** - * API to ask the Commit-Owner to backfill all commits >= 'startVersion' and <= `endVersion`. + * API to ask the Commit-Owner to backfill all commits > `lastKnownBackfilledVersion` and + * <= `endVersion`. * * If this API returns successfully, that means the backfill must have been completed, although * the Commit-Owner may not be aware of it yet. + * + * @param lastKnownBackfilledVersion The last known version that was backfilled by Commit-Owner + * before this API was called. If it's None or invalid, then the + * Commit-Owner should backfill from the beginning of the table. + * @param endVersion The version till which the Commit-Owner should backfill. */ def backfillToVersion( logStore: LogStore, hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit + lastKnownBackfilledVersion: Option[Long], + endVersion: Long): Unit /** * Determines whether this [[CommitOwnerClient]] is semantically equal to another diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwner.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwner.scala index cc3b94eb7f4..4655506b7ac 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwner.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwner.scala @@ -139,14 +139,16 @@ class InMemoryCommitOwner(val batchSize: Long) override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long]): GetCommitsResponse = { withReadLock[GetCommitsResponse](logPath) { val tableData = perTableMap.get(logPath) + val effectiveStartVersion = startVersion.getOrElse(0L) // Calculate the end version for the range, or use the last key if endVersion is not provided - val effectiveEndVersion = - endVersion.getOrElse(tableData.commitsMap.lastOption.map(_._1).getOrElse(startVersion)) - val commitsInRange = tableData.commitsMap.range(startVersion, effectiveEndVersion + 1) + val effectiveEndVersion = endVersion.getOrElse( + tableData.commitsMap.lastOption.map(_._1).getOrElse(effectiveStartVersion)) + val commitsInRange = tableData.commitsMap.range( + effectiveStartVersion, effectiveEndVersion + 1) GetCommitsResponse(commitsInRange.values.toSeq, tableData.lastRatifiedCommitVersion) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/TableCommitOwnerClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/TableCommitOwnerClient.scala index b48782a318f..cf295be308a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/TableCommitOwnerClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/TableCommitOwnerClient.scala @@ -49,16 +49,16 @@ case class TableCommitOwnerClient( } def getCommits( - startVersion: Long, + startVersion: Option[Long] = None, endVersion: Option[Long] = None): GetCommitsResponse = { commitOwnerClient.getCommits(logPath, tableConf, startVersion, endVersion) } def backfillToVersion( - startVersion: Long, - endVersion: Option[Long]): Unit = { + lastKnownBackfilledVersion: Option[Long], + endVersion: Long): Unit = { commitOwnerClient.backfillToVersion( - logStore, hadoopConf, logPath, tableConf, startVersion, endVersion) + logStore, hadoopConf, logPath, tableConf, lastKnownBackfilledVersion, endVersion) } /** diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala index 8e31eb282b9..8127700c018 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala @@ -43,7 +43,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest endVersion: Long): Unit = { val deltaLog = DeltaLog.forTable(spark, tablePath) deltaLog.update().tableCommitOwnerClientOpt.foreach { tableCommitOwnerClient => - tableCommitOwnerClient.backfillToVersion(startVersion = 0, Some(endVersion)) + tableCommitOwnerClient.backfillToVersion(lastKnownBackfilledVersion = None, endVersion) } val logReplay = new InMemoryLogReplay( minFileRetentionTimestamp = 0, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index ab2819dfe11..4a1c0dc06da 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -976,7 +976,7 @@ class InCommitTimestampWithManagedCommitSuite val commitFileProvider = DeltaCommitFileProvider(deltaLog.update()) val unbackfilledCommits = tableCommitOwnerClient - .getCommits(1) + .getCommits(Some(1)) .commits .map { commit => DeltaHistoryManager.Commit(commit.version, commit.commitTimestamp)} val commits = (Seq(commit0) ++ unbackfilledCommits).toList diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index d1f798b107a..2aef02989a2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -510,7 +510,7 @@ case class ConcurrentBackfillCommitOwnerClient( override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long]): GetCommitsResponse = { if (ConcurrentBackfillCommitOwnerClient.beginConcurrentBackfills) { CountDownLatchLogStore.listFromCalled.await() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientSuite.scala index 21ebe7e3af8..0197efcefdd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClientSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.test.SharedSparkSession class CommitOwnerClientSuite extends QueryTest with DeltaSQLTestUtils with SharedSparkSession with DeltaSQLCommandTest { - trait TestCommitOwnerClientBase extends CommitOwnerClient { + private trait TestCommitOwnerClientBase extends CommitOwnerClient { override def commit( logStore: LogStore, hadoopConf: Configuration, @@ -45,7 +45,7 @@ class CommitOwnerClientSuite extends QueryTest with DeltaSQLTestUtils with Share override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long] = None): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1) override def backfillToVersion( @@ -53,14 +53,14 @@ class CommitOwnerClientSuite extends QueryTest with DeltaSQLTestUtils with Share hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit = {} + lastKnownBackfilledVersion: Option[Long], + endVersion: Long): Unit = {} override def semanticEquals(other: CommitOwnerClient): Boolean = this == other } - class TestCommitOwnerClient1 extends TestCommitOwnerClientBase - class TestCommitOwnerClient2 extends TestCommitOwnerClientBase + private class TestCommitOwnerClient1 extends TestCommitOwnerClientBase + private class TestCommitOwnerClient2 extends TestCommitOwnerClientBase override def beforeEach(): Unit = { super.beforeEach() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala index f93d3a83878..d19dc1979cd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitOwnerSuite.scala @@ -159,7 +159,7 @@ class InMemoryCommitOwnerSuite extends QueryTest val tcs = TableCommitOwnerClient(cs, log, Map.empty[String, String]) cs.registerTable(logPath, currentVersion = -1L, Metadata(), Protocol(1, 1)) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, -1)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, -1)) // Commit 0 must be done by file-system val e = intercept[CommitFailedException] { commit(version = 0, timestamp = 0, tcs) } @@ -167,28 +167,28 @@ class InMemoryCommitOwnerSuite extends QueryTest store.write(FileNames.unsafeDeltaFile(logPath, 0), Iterator("0", "0"), overwrite = false) // Commit 0 doesn't go through commit-owner. So commit-owner is not aware of it in getCommits // response. - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, -1)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, -1)) assertBackfilled(0, logPath, Some(0)) val c1 = commit(1, 1, tcs) val c2 = commit(2, 2, tcs) - assert(tcs.getCommits(0).commits.takeRight(2) == Seq(c1, c2)) + assert(tcs.getCommits().commits.takeRight(2) == Seq(c1, c2)) // All 3 commits are backfilled since batchSize == 3 val c3 = commit(3, 3, tcs) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, 3)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, 3)) (1 to 3).foreach(i => assertBackfilled(i, logPath, Some(i))) // Test that startVersion and endVersion are respected in getCommits val c4 = commit(4, 4, tcs) val c5 = commit(5, 5, tcs) - assert(tcs.getCommits(4) == GetCommitsResponse(Seq(c4, c5), 5)) - assert(tcs.getCommits(4, Some(4)) == GetCommitsResponse(Seq(c4), 5)) - assert(tcs.getCommits(5) == GetCommitsResponse(Seq(c5), 5)) + assert(tcs.getCommits(Some(4)) == GetCommitsResponse(Seq(c4, c5), 5)) + assert(tcs.getCommits(Some(4), Some(4)) == GetCommitsResponse(Seq(c4), 5)) + assert(tcs.getCommits(Some(5)) == GetCommitsResponse(Seq(c5), 5)) // Commit [4, 6] are backfilled since batchSize == 3 val c6 = commit(6, 6, tcs) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, 6)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, 6)) (4 to 6).foreach(i => assertBackfilled(i, logPath, Some(i))) assertInvariants(logPath, tcs.commitOwnerClient.asInstanceOf[InMemoryCommitOwner]) } @@ -205,13 +205,13 @@ class InMemoryCommitOwnerSuite extends QueryTest val e = intercept[CommitFailedException] { commit(version = 0, timestamp = 0, tcs) } assert(e.getMessage === "Commit version 0 must go via filesystem.") store.write(FileNames.unsafeDeltaFile(logPath, 0), Iterator("0", "0"), overwrite = false) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, -1)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, -1)) assertBackfilled(version = 0, logPath, Some(0L)) // Test that all commits are immediately backfilled (1 to 3).foreach { version => commit(version, version, tcs) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, version)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, version)) assertBackfilled(version, logPath, Some(version)) } @@ -248,7 +248,7 @@ class InMemoryCommitOwnerSuite extends QueryTest // Verify that the conflict-checker still works even when everything has been backfilled commit(5, 5, tcs) - assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, 5)) + assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, 5)) assertCommitFail(5, 6, retryable = true, commit(5, 5, tcs)) assertCommitFail(7, 6, retryable = false, commit(7, 7, tcs)) @@ -306,7 +306,7 @@ class InMemoryCommitOwnerSuite extends QueryTest override def run(): Unit = { var currentWriterCommits = 0 while (currentWriterCommits < numberOfCommitsPerWriter) { - val nextVersion = tcs.getCommits(0).latestTableVersion + 1 + val nextVersion = tcs.getCommits().latestTableVersion + 1 try { val currentTimestamp = runningTimestamp.getAndIncrement() val commitResponse = commit(nextVersion, currentTimestamp, tcs) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala index bc9c32b4a63..24166e52fbc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala @@ -347,7 +347,7 @@ class ManagedCommitSuite override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long]): GetCommitsResponse = { if (failAttempts.contains(numGetCommitsCalled + 1)) { numGetCommitsCalled += 1 @@ -645,7 +645,7 @@ class ManagedCommitSuite override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long]): GetCommitsResponse = { assert(managedCommitTableConf === tableConf) super.getCommits(logPath, managedCommitTableConf, startVersion, endVersion) @@ -669,11 +669,16 @@ class ManagedCommitSuite hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersionOpt: Option[Long]): Unit = { + lastKnownBackfilledVersionOpt: Option[Long], + endVersion: Long): Unit = { assert(managedCommitTableConf === tableConf) super.backfillToVersion( - logStore, hadoopConf, logPath, managedCommitTableConf, startVersion, endVersionOpt) + logStore, + hadoopConf, + logPath, + managedCommitTableConf, + lastKnownBackfilledVersionOpt, + endVersion) } } ) @@ -947,8 +952,8 @@ class ManagedCommitSuite hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit = { } + startVersion: Option[Long], + endVersion: Long): Unit = { } }) CommitOwnerProvider.clearNonDefaultBuilders() val builder = diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala index 1a1d9a0d9bd..74bd858754e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala @@ -171,7 +171,7 @@ class TrackingCommitOwnerClient(delegatingCommitOwnerClient: InMemoryCommitOwner override def getCommits( logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, + startVersion: Option[Long], endVersion: Option[Long] = None): GetCommitsResponse = recordOperation("getCommits") { delegatingCommitOwnerClient.getCommits( logPath, managedCommitTableConf, startVersion, endVersion) @@ -182,10 +182,10 @@ class TrackingCommitOwnerClient(delegatingCommitOwnerClient: InMemoryCommitOwner hadoopConf: Configuration, logPath: Path, managedCommitTableConf: Map[String, String], - startVersion: Long, - endVersion: Option[Long]): Unit = recordOperation("backfillToVersion") { + lastKnownBackfilledVersion: Option[Long], + endVersion: Long): Unit = recordOperation("backfillToVersion") { delegatingCommitOwnerClient.backfillToVersion( - logStore, hadoopConf, logPath, managedCommitTableConf, startVersion, endVersion) + logStore, hadoopConf, logPath, managedCommitTableConf, lastKnownBackfilledVersion, endVersion) } override def semanticEquals(other: CommitOwnerClient): Boolean = this == other