Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 22, 2024
1 parent c01665b commit 8fe9861
Showing 1 changed file with 0 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -483,97 +483,3 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar
}
}
}

class SemaphoreReleasingLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
extends LocalLogStore(sparkConf, hadoopConf) {
val listFromCalledSemaphore = new Semaphore(0)
override def listFrom(path: Path, hadoopConf: Configuration): Iterator[FileStatus] = {
val files = super.listFrom(path, hadoopConf).toSeq
listFromCalledSemaphore.release()
files.iterator
}
}

class SnapshotManagementParallelListingSuite extends QueryTest with SharedSparkSession {

override protected def sparkConf: SparkConf =
super.sparkConf.set(logStoreClassConfKey, classOf[SemaphoreReleasingLogStore].getName)

protected def store: SemaphoreReleasingLogStore =
spark.sharedState.logStore.asInstanceOf[SemaphoreReleasingLogStore]

Seq(true, false).foreach { includeGapAtTheEnd =>
test(s"concurrent backfills are properly reconciled with gapAtTheEnd: $includeGapAtTheEnd") {
val commitStoreName = "awaiting-commit-store"
// We defer backfills to manually trigger concurrent backfills for versions [3, 5] during
// CommitStore.getCommits but after the LogStore listing.
// Depending on the value of gapAtTheEnd, we also write versions [6, 8] to the log.
val deferredBackfills: mutable.Map[Long, () => Unit] = mutable.Map.empty
var beginConcurrentBackfills = false

case class ConcurrentBackfillCommitStore() extends InMemoryCommitStore(batchSize = 5) {
override def getCommits(
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): GetCommitsResponse = {
if (beginConcurrentBackfills) {
// Wait on the semaphore to ensure file-system listFrom call completes and then backfill
// commit [3, 5] to create a perceived gap between file-system deltas and
// commit-store commits and test the reconciliation logic works as expected.
store.listFromCalledSemaphore.acquire()
logInfo(s"Finishing pending backfills concurrently: ${deferredBackfills.keySet}")
deferredBackfills.keys.foreach(version => deferredBackfills(version)())
}
super.getCommits(logPath, startVersion, endVersion)
}
override def backfill(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
version: Long,
fileStatus: FileStatus): Unit = {
if (version <= 2) {
super.backfill(logStore, hadoopConf, logPath, version, fileStatus)
} else {
deferredBackfills(version) =
() => super.backfill(logStore, hadoopConf, logPath, version, fileStatus)
}
}
}

object ConcurrentBackfillCommitStoreBuilder extends CommitStoreBuilder {
private lazy val concurrentBackfillCommitStore = ConcurrentBackfillCommitStore()
override def name: String = commitStoreName
override def build(conf: Map[String, String]): CommitStore = concurrentBackfillCommitStore
}

CommitStoreProvider.clearNonDefaultBuilders()
CommitStoreProvider.registerBuilder(ConcurrentBackfillCommitStoreBuilder)

withSQLConf(
MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> commitStoreName,
DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS.key -> "false") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
val dataPath = new Path(path)
spark.range(10).write.format("delta").save(path)
(1 to 5).foreach(_ => spark.range(10).write.format("delta").mode("append").save(path))
if (!includeGapAtTheEnd) {
(6 to 8).foreach(_ => spark.range(10).write.format("delta").mode("append").save(path))
}

DeltaLog.invalidateCache(spark, dataPath)
beginConcurrentBackfills = true
store.listFromCalledSemaphore.drainPermits()
val snapshot = DeltaLog.forTable(spark, dataPath).update()

// Everything until version-5 must be backfilled.
snapshot.logSegment.deltas.zipWithIndex.foreach { case (delta, index) =>
val condition = delta.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString)
if (index <= 5) assert(!condition, index) else assert(condition, index)
}
}
}
}
}
}

0 comments on commit 8fe9861

Please sign in to comment.