diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index b2c13e42230..e00898f9600 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1950,6 +1950,17 @@ trait OptimisticTransactionImpl extends TransactionalWrite logPath: Path, startVersion: Long, endVersion: Option[Long]): Unit = {} + + /** + * FileSystemBasedCommitStore is supposed to be treated as a singleton object for a Delta Log + * and is equal to all other instances of FileSystemBasedCommitStore for the same Delta Log. + */ + override def semanticEquals(other: CommitStore): Boolean = { + other match { + case fsCommitStore: FileSystemBasedCommitStore => fsCommitStore.deltaLog == deltaLog + case _ => false + } + } } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 933499a1041..866824b0a6e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -1010,7 +1010,8 @@ trait SnapshotManagement { self: DeltaLog => // If the commit store has changed, we need to again invoke updateSnapshot so that we // could get the latest commits from the new commit store. We need to do it only once as // the delta spec mandates the commit which changes the commit owner to be backfilled. - if (newSnapshot.version >= 0 && newSnapshot.commitStoreOpt != initialCommitStore) { + if (newSnapshot.version >= 0 && + !CommitStore.semanticEquals(newSnapshot.commitStoreOpt, initialCommitStore)) { val segmentOpt = createLogSegment(newSnapshot) newSnapshot = getSnapshotForLogSegmentInternal( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala index 41920688bf0..5d3a180bdff 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala @@ -106,6 +106,28 @@ trait CommitStore { logPath: Path, startVersion: Long, endVersion: Option[Long]): Unit + + /** + * Determines whether this CommitStore is semantically equal to another CommitStore. + * + * Semantic equality is determined by each CommitStore implementation based on whether the two + * instances can be used interchangeably when invoking any of the CommitStore APIs, such as + * `commit`, `getCommits`, etc. For e.g., both the instances might be pointing to the same + * underlying endpoint. + */ + def semanticEquals(other: CommitStore): Boolean +} + +object CommitStore { + def semanticEquals( + commitStore1Opt: Option[CommitStore], + commitStore2Opt: Option[CommitStore]): Boolean = { + (commitStore1Opt, commitStore2Opt) match { + case (Some(commitStore1), Some(commitStore2)) => commitStore1.semanticEquals(commitStore2) + case (None, None) => true + case _ => false + } + } } /** A builder interface for CommitStore */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala index 8055a5c37f6..b61eec9c3ab 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala @@ -132,6 +132,8 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC throw new IllegalStateException(s"Table $logPath already exists in the commit store.") } } + + override def semanticEquals(other: CommitStore): Boolean = this == other } /** diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index f744f71e64f..c41cc52515c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -491,6 +491,7 @@ class OptimisticTransactionSuite logPath: Path, startVersion: Long, endVersion: Option[Long]): Unit = {} + override def semanticEquals(other: CommitStore): Boolean = this == other } } } @@ -543,6 +544,7 @@ class OptimisticTransactionSuite logPath: Path, startVersion: Long, endVersion: Option[Long]): Unit = {} + override def semanticEquals(other: CommitStore): Boolean = this == other } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala index 37ae4a75e0d..535f7c28c5e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala @@ -52,6 +52,8 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark logPath: Path, startVersion: Long, endVersion: Option[Long]): Unit = {} + + override def semanticEquals(other: CommitStore): Boolean = this == other } class TestCommitStore1 extends TestCommitStoreBase @@ -187,4 +189,29 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark assert(getWriterFeatures(deltaLog).contains(ManagedCommitTableFeature.name)) } } + + test("Semantic Equality works as expected on CommitStores") { + class TestCommitStore(val key: String) extends TestCommitStoreBase { + override def semanticEquals(other: CommitStore): Boolean = + other.isInstanceOf[TestCommitStore] && other.asInstanceOf[TestCommitStore].key == key + } + object Builder1 extends CommitStoreBuilder { + override def build(conf: Map[String, String]): CommitStore = { + new TestCommitStore(conf("key")) + } + override def name: String = "cs-name" + } + CommitStoreProvider.registerBuilder(Builder1) + + // Different CommitStores with same keys should be semantically equal. + val obj1 = CommitStoreProvider.getCommitStore("cs-name", Map("key" -> "url1")) + val obj2 = CommitStoreProvider.getCommitStore("cs-name", Map("key" -> "url1")) + assert(obj1 != obj2) + assert(obj1.semanticEquals(obj2)) + + // Different CommitStores with different keys should be semantically unequal. + val obj3 = CommitStoreProvider.getCommitStore("cs-name", Map("key" -> "url2")) + assert(obj1 != obj3) + assert(!obj1.semanticEquals(obj3)) + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala index 7fde82b7d57..a414a459a49 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala @@ -143,6 +143,8 @@ class TrackingCommitStore(delegatingCommitStore: InMemoryCommitStore) extends Co delegatingCommitStore.backfillToVersion(logStore, hadoopConf, logPath, startVersion, endVersion) } + override def semanticEquals(other: CommitStore): Boolean = this == other + def registerTable( logPath: Path, maxCommitVersion: Long): Unit = {