Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Apr 15, 2024
1 parent 9bae749 commit a553cf5
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1950,6 +1950,17 @@ trait OptimisticTransactionImpl extends TransactionalWrite
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}

/**
* FileSystemBasedCommitStore is supposed to be treated as a singleton object for a Delta Log
* and is equal to all other instances of FileSystemBasedCommitStore for the same Delta Log.
*/
override def semanticEquals(other: CommitStore): Boolean = {
other match {
case fsCommitStore: FileSystemBasedCommitStore => fsCommitStore.deltaLog == deltaLog
case _ => false
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,8 @@ trait SnapshotManagement { self: DeltaLog =>
// If the commit store has changed, we need to again invoke updateSnapshot so that we
// could get the latest commits from the new commit store. We need to do it only once as
// the delta spec mandates the commit which changes the commit owner to be backfilled.
if (newSnapshot.version >= 0 && newSnapshot.commitStoreOpt != initialCommitStore) {
if (newSnapshot.version >= 0 &&
!CommitStore.semanticEquals(newSnapshot.commitStoreOpt, initialCommitStore)) {
val segmentOpt = createLogSegment(newSnapshot)
newSnapshot =
getSnapshotForLogSegmentInternal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ trait CommitStore {
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit

/**
* Determines whether this CommitStore is semantically equal to another CommitStore.
*
* Semantic equality is determined by each CommitStore implementation based on whether the two
* instances can be used interchangeably when invoking any of the CommitStore APIs, such as
* `commit`, `getCommits`, etc. For e.g., both the instances might be pointing to the same
* underlying endpoint.
*/
def semanticEquals(other: CommitStore): Boolean
}

object CommitStore {
def semanticEquals(
commitStore1Opt: Option[CommitStore],
commitStore2Opt: Option[CommitStore]): Boolean = {
(commitStore1Opt, commitStore2Opt) match {
case (Some(commitStore1), Some(commitStore2)) => commitStore1.semanticEquals(commitStore2)
case (None, None) => true
case _ => false
}
}
}

/** A builder interface for CommitStore */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
throw new IllegalStateException(s"Table $logPath already exists in the commit store.")
}
}

override def semanticEquals(other: CommitStore): Boolean = this == other
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ class OptimisticTransactionSuite
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
override def semanticEquals(other: CommitStore): Boolean = this == other
}
}
}
Expand Down Expand Up @@ -543,6 +544,7 @@ class OptimisticTransactionSuite
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}
override def semanticEquals(other: CommitStore): Boolean = this == other
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): Unit = {}

override def semanticEquals(other: CommitStore): Boolean = this == other
}

class TestCommitStore1 extends TestCommitStoreBase
Expand Down Expand Up @@ -187,4 +189,29 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark
assert(getWriterFeatures(deltaLog).contains(ManagedCommitTableFeature.name))
}
}

test("Semantic Equality works as expected on CommitStores") {
class TestCommitStore(val key: String) extends TestCommitStoreBase {
override def semanticEquals(other: CommitStore): Boolean =
other.isInstanceOf[TestCommitStore] && other.asInstanceOf[TestCommitStore].key == key
}
object Builder1 extends CommitStoreBuilder {
override def build(conf: Map[String, String]): CommitStore = {
new TestCommitStore(conf("key"))
}
override def name: String = "cs-name"
}
CommitStoreProvider.registerBuilder(Builder1)

// Different CommitStores with same keys should be semantically equal.
val obj1 = CommitStoreProvider.getCommitStore("cs-name", Map("key" -> "url1"))
val obj2 = CommitStoreProvider.getCommitStore("cs-name", Map("key" -> "url1"))
assert(obj1 != obj2)
assert(obj1.semanticEquals(obj2))

// Different CommitStores with different keys should be semantically unequal.
val obj3 = CommitStoreProvider.getCommitStore("cs-name", Map("key" -> "url2"))
assert(obj1 != obj3)
assert(!obj1.semanticEquals(obj3))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ class TrackingCommitStore(delegatingCommitStore: InMemoryCommitStore) extends Co
delegatingCommitStore.backfillToVersion(logStore, hadoopConf, logPath, startVersion, endVersion)
}

override def semanticEquals(other: CommitStore): Boolean = this == other

def registerTable(
logPath: Path,
maxCommitVersion: Long): Unit = {
Expand Down

0 comments on commit a553cf5

Please sign in to comment.