Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed May 9, 2024
1 parent 8a56bdc commit 35ffb6e
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,12 @@ trait SnapshotManagement { self: DeltaLog =>
versionToLoad.forall(maxDeltaVersionSeen < _) && areDeltaFilesMissing
}

val initialMaxDeltaVersionSeen = maxDeltaVersionSeen
val additionalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] =
if (requiresAdditionalListing()) {
recordDeltaEvent(this, "delta.listDeltaAndCheckpointFiles.requiresAdditionalFsListing")
listFromFileSystemInternal(
startVersion = maxDeltaVersionSeen + 1, versionToLoad, includeMinorCompactions)
startVersion = initialMaxDeltaVersionSeen + 1, versionToLoad, includeMinorCompactions)
} else {
None
}
Expand Down Expand Up @@ -252,7 +253,14 @@ trait SnapshotManagement { self: DeltaLog =>

val finalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] =
(initialLogTuplesFromFsListingOpt, additionalLogTuplesFromFsListingOpt) match {
case (Some(initial), Some(additional)) => Some(initial ++ additional)
case (Some(initial), Some(additional)) =>
// Filter initial list to exclude files with versions beyond
// `initialListingMaxDeltaVersionSeen` to prevent duplicating non-delta files with
// higher versions in the combined list. Ideally we shouldn't need this, but we are
// being defensive here if the log has missing files.
// E.g. initial = [0.json, 1.json, 2.checkpoint], initialListingMaxDeltaVersionSeen = 1,
// additional = [2.checkpoint], final = [0.json, 1.json, 2.checkpoint]
Some(initial.takeWhile(_._3 <= initialMaxDeltaVersionSeen) ++ additional)
case (Some(initial), None) => Some(initial)
case (None, Some(additional)) => Some(additional)
case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ class CheckpointsSuite

// Delete the commit files 0-9, so that we are forced to read the checkpoint file
val logPath = new Path(new File(target, "_delta_log").getAbsolutePath)
for (i <- 0 to 10) {
for (i <- 0 to 9) {
val file = new File(FileNames.unsafeDeltaFile(logPath, version = i).toString)
file.delete()
}
Expand Down Expand Up @@ -818,6 +818,7 @@ class CheckpointsSuite
private def writeAllActionsInV2Manifest(
snapshot: Snapshot,
v2CheckpointFormat: V2Checkpoint.Format): Path = {
snapshot.ensureCommitFilesBackfilled()
val checkpointMetadata = CheckpointMetadata(version = snapshot.version)
val actionsDS = snapshot.stateDS
.where("checkpointMetadata is null and " +
Expand Down Expand Up @@ -1062,15 +1063,15 @@ class FakeGCSFileSystemValidatingCommits extends FakeGCSFileSystemValidatingChec
override protected def shouldValidateFilePattern(f: Path): Boolean = f.getName.contains(".json")
}

class ManagedCommitBatch1BackFillCheckpointsSuite extends CheckpointsSuite {
class CheckpointsWithManagedCommitBatch1Suite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class ManagedCommitBatch2BackFillCheckpointsSuite extends CheckpointsSuite {
class CheckpointsWithManagedCommitBatch2Suite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class ManagedCommitBatch20BackFillCheckpointsSuite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(20)
class CheckpointsWithManagedCommitBatch100Suite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(100)
}

Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ trait DeltaTimeTravelTests extends QueryTest
)
}


verifyLogging(2L, 0L, "version", "dfReader") {
checkAnswer(
spark.read.format("delta").option("versionAsOf", "0")
Expand Down Expand Up @@ -371,7 +370,11 @@ trait DeltaTimeTravelTests extends QueryTest
val e2 = intercept[AnalysisException] {
sql(s"select count(*) from ${versionAsOf(tblName, 0)}").collect()
}
assert(e2.getMessage.contains("No recreatable commits found at"))
if (managedCommitBackfillBatchSize.exists(_ > 2)) {
assert(e2.getMessage.contains("No commits found at"))
} else {
assert(e2.getMessage.contains("No recreatable commits found at"))
}
}
}
}
Expand Down Expand Up @@ -664,6 +667,14 @@ class DeltaHistoryManagerSuite extends DeltaHistoryManagerBase {
}
}

class ManagedCommitFill1DeltaHistoryManagerSuite extends DeltaHistoryManagerSuite {
class DeltaHistoryManagerWithManagedCommitBatch1Suite extends DeltaHistoryManagerSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class DeltaHistoryManagerWithManagedCommitBatch2Suite extends DeltaHistoryManagerSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class DeltaHistoryManagerWithManagedCommitBatch100Suite extends DeltaHistoryManagerSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,15 +440,15 @@ class DeltaLogMinorCompactionSuite extends QueryTest
}
}

class ManagedCommitBatchBackfill1DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite {
class DeltaLogMinorCompactionWithManagedCommitBatch1Suite extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class ManagedCommitBatchBackFill2DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite {
class DeltaLogMinorCompactionWithManagedCommitBatch2Suite extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class ManagedCommitBatchBackFill20DeltaLogMinorCompactionSuite
class DeltaLogMinorCompactionWithManagedCommitBatch100Suite
extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(20)
override val managedCommitBackfillBatchSize: Option[Int] = Some(100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,6 @@ class DeltaTimeTravelSuite extends QueryTest
}
}

class ManagedCommitFill1DeltaTimeTravelSuite extends DeltaTimeTravelSuite {
class DeltaTimeTravelWithManagedCommitBatch1Suite extends DeltaTimeTravelSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.FileNotFoundException

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION}
import org.apache.spark.sql.delta.managedcommit.ManagedCommitTestUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

Expand All @@ -33,6 +34,7 @@ import org.apache.spark.util.Utils

trait DescribeDeltaDetailSuiteBase extends QueryTest
with SharedSparkSession
with ManagedCommitTestUtils
with DeltaTestUtilsForTempViews {

import testImplicits._
Expand Down Expand Up @@ -211,7 +213,8 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest
}
}

test("delta table: describe detail always run on the latest snapshot") {
testWithDifferentBackfillIntervalOptional(
"delta table: describe detail always run on the latest snapshot") { batchSizeOpt =>
val tableName = "tbl_name_on_latest_snapshot"
withTable(tableName) {
val tempDir = Utils.createTempDir().toString
Expand All @@ -229,8 +232,14 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest
metadata.configuration ++ Map("foo" -> "bar")
)
txn.commit(newMetadata :: Nil, DeltaOperations.ManualUpdate)
val managedCommitProperties = batchSizeOpt.map(_ =>
Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> "tracking-in-memory",
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.key -> "{\"randomConf\":\"randomConfValue\"}",
DeltaConfigs.MANAGED_COMMIT_TABLE_CONF.key -> "{}",
DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key -> "true"))
.getOrElse(Map.empty)
checkResult(sql(s"DESCRIBE DETAIL $tableName"),
Seq(Map("foo" -> "bar")),
Seq(Map("foo" -> "bar") ++ managedCommitProperties),
Seq("properties")
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.delta.storage.LogStore.logStoreClassConfKey
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames, JsonUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
Expand All @@ -45,7 +45,7 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.storage.StorageLevel

class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with SharedSparkSession
with DeltaSQLCommandTest {
with DeltaSQLCommandTest with ManagedCommitBaseSuite {


/**
Expand Down Expand Up @@ -304,6 +304,12 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar
// Delete delta files
new File(tempDir, "_delta_log").listFiles().filter(_.getName.endsWith(".json"))
.foreach(_.delete())
if (managedCommitsEnabledInTests) {
new File(new File(tempDir, "_delta_log"), "_commits")
.listFiles()
.filter(_.getName.endsWith(".json"))
.foreach(_.delete())
}
makeCorruptCheckpointFile(path, checkpointVersion = 0, shouldBeEmpty = false)
val e = intercept[IllegalStateException] {
staleLog.update()
Expand Down Expand Up @@ -459,10 +465,34 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar
val oldLogSegment = log.snapshot.logSegment
spark.range(10).write.format("delta").save(path)
val newLogSegment = log.snapshot.logSegment
assert(log.getLogSegmentAfterCommit(None, oldLogSegment.checkpointProvider) === newLogSegment)
assert(log.getLogSegmentAfterCommit(
log.snapshot.tableCommitOwnerClientOpt,
oldLogSegment.checkpointProvider) === newLogSegment)
spark.range(10).write.format("delta").mode("append").save(path)
assert(log.getLogSegmentAfterCommit(None, oldLogSegment.checkpointProvider)
=== log.snapshot.logSegment)
val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf())
val commitFileProvider = DeltaCommitFileProvider(log.snapshot)
intercept[IllegalArgumentException] {
val commitFile = fs.getFileStatus(commitFileProvider.deltaFile(1))
val commit = Commit(
version = 1,
fileStatus = commitFile,
commitTimestamp = 0)
// Version exists, but not contiguous with old logSegment
log.getLogSegmentAfterCommit(1, None, oldLogSegment, commit, None, EmptyCheckpointProvider)
}
intercept[IllegalArgumentException] {
val commitFile = fs.getFileStatus(commitFileProvider.deltaFile(0))
val commit = Commit(
version = 0,
fileStatus = commitFile,
commitTimestamp = 0)

// Version exists, but newLogSegment already contains it
log.getLogSegmentAfterCommit(0, None, newLogSegment, commit, None, EmptyCheckpointProvider)
}
assert(log.getLogSegmentAfterCommit(
log.snapshot.tableCommitOwnerClientOpt,
oldLogSegment.checkpointProvider) === log.snapshot.logSegment)
}
}

Expand All @@ -488,6 +518,18 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar
}
}

class SnapshotManagementWithManagedCommitBatch1Suite extends SnapshotManagementSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class SnapshotManagementWithManagedCommitBatch2Suite extends SnapshotManagementSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class SnapshotManagementWithManagedCommitBatch100Suite extends SnapshotManagementSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(100)
}

class CountDownLatchLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
extends LocalLogStore(sparkConf, hadoopConf) {
override def listFrom(path: Path, hadoopConf: Configuration): Iterator[FileStatus] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ trait ManagedCommitTestUtils
test(s"$testName [Backfill batch size: None]") {
f(None)
}
val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue")
val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf)
withSQLConf(
DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> "in-memory",
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey -> managedCommitOwnerJson) {
testWithDifferentBackfillInterval(testName) { backfillBatchSize =>
testWithDifferentBackfillInterval(testName) { backfillBatchSize =>
val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue")
val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf)
withSQLConf(
DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> "tracking-in-memory",
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey ->
managedCommitOwnerJson) {
f(Some(backfillBatchSize))
}
}
Expand Down

0 comments on commit 35ffb6e

Please sign in to comment.