diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index fd0ba01eb91..c4eb1855fab 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -483,97 +483,3 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar } } } - -class SemaphoreReleasingLogStore(sparkConf: SparkConf, hadoopConf: Configuration) - extends LocalLogStore(sparkConf, hadoopConf) { - val listFromCalledSemaphore = new Semaphore(0) - override def listFrom(path: Path, hadoopConf: Configuration): Iterator[FileStatus] = { - val files = super.listFrom(path, hadoopConf).toSeq - listFromCalledSemaphore.release() - files.iterator - } -} - -class SnapshotManagementParallelListingSuite extends QueryTest with SharedSparkSession { - - override protected def sparkConf: SparkConf = - super.sparkConf.set(logStoreClassConfKey, classOf[SemaphoreReleasingLogStore].getName) - - protected def store: SemaphoreReleasingLogStore = - spark.sharedState.logStore.asInstanceOf[SemaphoreReleasingLogStore] - - Seq(true, false).foreach { includeGapAtTheEnd => - test(s"concurrent backfills are properly reconciled with gapAtTheEnd: $includeGapAtTheEnd") { - val commitStoreName = "awaiting-commit-store" - // We defer backfills to manually trigger concurrent backfills for versions [3, 5] during - // CommitStore.getCommits but after the LogStore listing. - // Depending on the value of gapAtTheEnd, we also write versions [6, 8] to the log. - val deferredBackfills: mutable.Map[Long, () => Unit] = mutable.Map.empty - var beginConcurrentBackfills = false - - case class ConcurrentBackfillCommitStore() extends InMemoryCommitStore(batchSize = 5) { - override def getCommits( - logPath: Path, - startVersion: Long, - endVersion: Option[Long]): GetCommitsResponse = { - if (beginConcurrentBackfills) { - // Wait on the semaphore to ensure file-system listFrom call completes and then backfill - // commit [3, 5] to create a perceived gap between file-system deltas and - // commit-store commits and test the reconciliation logic works as expected. - store.listFromCalledSemaphore.acquire() - logInfo(s"Finishing pending backfills concurrently: ${deferredBackfills.keySet}") - deferredBackfills.keys.foreach(version => deferredBackfills(version)()) - } - super.getCommits(logPath, startVersion, endVersion) - } - override def backfill( - logStore: LogStore, - hadoopConf: Configuration, - logPath: Path, - version: Long, - fileStatus: FileStatus): Unit = { - if (version <= 2) { - super.backfill(logStore, hadoopConf, logPath, version, fileStatus) - } else { - deferredBackfills(version) = - () => super.backfill(logStore, hadoopConf, logPath, version, fileStatus) - } - } - } - - object ConcurrentBackfillCommitStoreBuilder extends CommitStoreBuilder { - private lazy val concurrentBackfillCommitStore = ConcurrentBackfillCommitStore() - override def name: String = commitStoreName - override def build(conf: Map[String, String]): CommitStore = concurrentBackfillCommitStore - } - - CommitStoreProvider.clearNonDefaultBuilders() - CommitStoreProvider.registerBuilder(ConcurrentBackfillCommitStoreBuilder) - - withSQLConf( - MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> commitStoreName, - DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS.key -> "false") { - withTempDir { tempDir => - val path = tempDir.getCanonicalPath - val dataPath = new Path(path) - spark.range(10).write.format("delta").save(path) - (1 to 5).foreach(_ => spark.range(10).write.format("delta").mode("append").save(path)) - if (!includeGapAtTheEnd) { - (6 to 8).foreach(_ => spark.range(10).write.format("delta").mode("append").save(path)) - } - - DeltaLog.invalidateCache(spark, dataPath) - beginConcurrentBackfills = true - store.listFromCalledSemaphore.drainPermits() - val snapshot = DeltaLog.forTable(spark, dataPath).update() - - // Everything until version-5 must be backfilled. - snapshot.logSegment.deltas.zipWithIndex.foreach { case (delta, index) => - val condition = delta.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString) - if (index <= 5) assert(!condition, index) else assert(condition, index) - } - } - } - } - } -}