Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed May 3, 2024
1 parent b472d4e commit 3883545
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,14 @@ trait SnapshotManagement { self: DeltaLog =>
.collect { case cp if cp.version == ci.version => cp }
.getOrElse(ci.getCheckpointProvider(this, checkpoints, lastCheckpointInfo))
}
// In the case where `deltasAfterCheckpoint` is empty, `deltas` should still not be empty,
// they may just be before the checkpoint version unless we have a bug in log cleanup.
if (deltas.isEmpty) {
throw new IllegalStateException(s"Could not find any delta files for version $newVersion")
// If there's a valid checkpoint, `deltas` should contain the checkpoint version unless we
// have a bug in log cleanup.
newCheckpoint.map(_.version).foreach { cpVersion =>
deltas.filterNot(isUnbackfilledDeltaFile).find(deltaVersion(_) == cpVersion).orElse(
throw new IllegalStateException(
s"Could not find any backfilled delta files for version $cpVersion even though there " +
"is a checkpoint at this version.")
)
}
if (versionToLoad.exists(_ != newVersion)) {
throwNonExistentVersionError(versionToLoad.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ class CheckpointsSuite

// Delete the commit files 0-9, so that we are forced to read the checkpoint file
val logPath = new Path(new File(target, "_delta_log").getAbsolutePath)
for (i <- 0 to 10) {
for (i <- 0 to 9) {
val file = new File(FileNames.unsafeDeltaFile(logPath, version = i).toString)
file.delete()
}
Expand Down Expand Up @@ -1062,15 +1062,15 @@ class FakeGCSFileSystemValidatingCommits extends FakeGCSFileSystemValidatingChec
override protected def shouldValidateFilePattern(f: Path): Boolean = f.getName.contains(".json")
}

class ManagedCommitBatch1BackFillCheckpointsSuite extends CheckpointsSuite {
class CheckpointsWithManagedCommitBatch1Suite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class ManagedCommitBatch2BackFillCheckpointsSuite extends CheckpointsSuite {
class CheckpointsWithManagedCommitBatch2Suite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class ManagedCommitBatch20BackFillCheckpointsSuite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(20)
class CheckpointsWithManagedCommitBatch100Suite extends CheckpointsSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(100)
}

Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,14 @@ class DeltaHistoryManagerSuite extends DeltaHistoryManagerBase {
}
}

class ManagedCommitFill1DeltaHistoryManagerSuite extends DeltaHistoryManagerSuite {
class DeltaHistoryManagerWithManagedCommitBatch1Suite extends DeltaHistoryManagerSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class DeltaHistoryManagerWithManagedCommitBatch2Suite extends DeltaHistoryManagerSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class DeltaHistoryManagerWithManagedCommitBatch100Suite extends DeltaHistoryManagerSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,15 +440,15 @@ class DeltaLogMinorCompactionSuite extends QueryTest
}
}

class ManagedCommitBatchBackfill1DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite {
class DeltaLogMinorCompactionWithManagedCommitBatch1Suite extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class ManagedCommitBatchBackFill2DeltaLogMinorCompactionSuite extends DeltaLogMinorCompactionSuite {
class DeltaLogMinorCompactionWithManagedCommitBatch2Suite extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class ManagedCommitBatchBackFill20DeltaLogMinorCompactionSuite
class DeltaLogMinorCompactionWithManagedCommitBatch100Suite
extends DeltaLogMinorCompactionSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(20)
override val managedCommitBackfillBatchSize: Option[Int] = Some(100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,6 @@ class DeltaTimeTravelSuite extends QueryTest
}
}

class ManagedCommitFill1DeltaTimeTravelSuite extends DeltaTimeTravelSuite {
class DeltaTimeTravelWithManagedCommitBatch1Suite extends DeltaTimeTravelSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.FileNotFoundException

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION}
import org.apache.spark.sql.delta.managedcommit.ManagedCommitTestUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

Expand All @@ -33,6 +34,7 @@ import org.apache.spark.util.Utils

trait DescribeDeltaDetailSuiteBase extends QueryTest
with SharedSparkSession
with ManagedCommitTestUtils
with DeltaTestUtilsForTempViews {

import testImplicits._
Expand Down Expand Up @@ -211,7 +213,8 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest
}
}

test("delta table: describe detail always run on the latest snapshot") {
testWithDifferentBackfillIntervalOptional(
"delta table: describe detail always run on the latest snapshot") { batchSizeOpt =>
val tableName = "tbl_name_on_latest_snapshot"
withTable(tableName) {
val tempDir = Utils.createTempDir().toString
Expand All @@ -229,8 +232,13 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest
metadata.configuration ++ Map("foo" -> "bar")
)
txn.commit(newMetadata :: Nil, DeltaOperations.ManualUpdate)
val managedCommitProperties = batchSizeOpt.map(_ =>
Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> "tracking-in-memory",
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.key -> "{\"randomConf\":\"randomConfValue\"}",
DeltaConfigs.MANAGED_COMMIT_TABLE_CONF.key -> "{}"))
.getOrElse(Map.empty)
checkResult(sql(s"DESCRIBE DETAIL $tableName"),
Seq(Map("foo" -> "bar")),
Seq(Map("foo" -> "bar") ++ managedCommitProperties),
Seq("properties")
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.delta.storage.LogStore.logStoreClassConfKey
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames, JsonUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
Expand All @@ -45,7 +45,7 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.storage.StorageLevel

class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with SharedSparkSession
with DeltaSQLCommandTest {
with DeltaSQLCommandTest with ManagedCommitBaseSuite {


/**
Expand Down Expand Up @@ -235,45 +235,56 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar
makeCorruptCheckpointFile(path, checkpointVersion = 1,
shouldBeEmpty = testEmptyCheckpoint, multipart = multipart)

// The code paths are different, but the error and message end up being the same:
//
// testEmptyCheckpoint = true:
// - checkpoint 1 is NOT in the list result.
// - fallback to load version 0 using checkpoint 0
// - fail to read checkpoint 0
// - cannot find log file 0 so throw the above checkpoint 0 read failure
//
// testEmptyCheckpoint = false:
// - checkpoint 1 is in the list result.
// - Snapshot creation triggers state reconstruction
// - fail to read protocol+metadata from checkpoint 1
// - fallback to load version 0 using checkpoint 0
// - fail to read checkpoint 0
// - cannot find log file 0 so throw the original checkpoint 1 read failure
val e = intercept[SparkException] { staleLog.update() }
val version = if (testEmptyCheckpoint) 0 else 1
assert(e.getMessage.contains(f"$version%020d.checkpoint") &&
e.getMessage.contains(SHOULD_NOT_RECOVER_CHECKPOINT_ERROR_MSG))
val e = intercept[Exception] { staleLog.update() }
if (testEmptyCheckpoint) {
// - checkpoint 1 is NOT in the list result.
// - fallback to load version 0 using checkpoint 0
// - cannot find log file 0 so throw the log file not found failure.
assert(
e.isInstanceOf[IllegalStateException] &&
e.getMessage.contains("Could not find any backfilled delta files for version 0 " +
"even though there is a checkpoint at this version."))
} else {
// - checkpoint 1 is in the list result.
// - Snapshot creation triggers state reconstruction
// - fail to read protocol+metadata from checkpoint 1
// - fallback to load version 0 using checkpoint 0
// - cannot find log file 0 so throw the original checkpoint 1 read failure
assert(e.isInstanceOf[SparkException] && e.getMessage.contains("0001.checkpoint") &&
e.getMessage.contains(SHOULD_NOT_RECOVER_CHECKPOINT_ERROR_MSG))
}
}
}
}

test("should throw a clear exception when checkpoint exists but its corresponding delta file " +
"doesn't exist") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
val staleLog = DeltaLog.forTable(spark, path)
DeltaLog.clearCache()
BOOLEAN_DOMAIN.foreach { deleteUnbackfilledDeltas =>
test(
"should throw a clear exception when checkpoint exists but its corresponding delta file " +
s"doesn't exist, deleteUnbackfilledDeltas: $deleteUnbackfilledDeltas") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
val staleLog = DeltaLog.forTable(spark, path)
DeltaLog.clearCache()

spark.range(10).write.format("delta").save(path)
DeltaLog.forTable(spark, path).checkpoint()
// Delete delta files
new File(tempDir, "_delta_log").listFiles().filter(_.getName.endsWith(".json"))
.foreach(_.delete())
val e = intercept[IllegalStateException] {
staleLog.update()
spark.range(10).write.format("delta").save(path)
spark.range(10).write.format("delta").mode("append").save(path)
DeltaLog.forTable(spark, path).checkpoint()
// Delete delta file at version 1
new File(tempDir, "_delta_log")
.listFiles()
.filter(_.getName.endsWith("1.json"))
.foreach(_.delete())
if (deleteUnbackfilledDeltas) {
new File(new File(tempDir, "_delta_log"), "_commits")
.listFiles()
.filter(_.getName.endsWith("1.json"))
.foreach(_.delete())
}
val e = intercept[IllegalStateException] {
staleLog.update()
}
assert(e.getMessage.contains("Could not find any backfilled delta files for version 1"))
}
assert(e.getMessage.contains("Could not find any delta files for version 0"))
}
}

Expand Down Expand Up @@ -304,11 +315,17 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar
// Delete delta files
new File(tempDir, "_delta_log").listFiles().filter(_.getName.endsWith(".json"))
.foreach(_.delete())
if (managedCommitsEnabledInTests) {
new File(new File(tempDir, "_delta_log"), "_commits")
.listFiles()
.filter(_.getName.endsWith(".json"))
.foreach(_.delete())
}
makeCorruptCheckpointFile(path, checkpointVersion = 0, shouldBeEmpty = false)
val e = intercept[IllegalStateException] {
staleLog.update()
}
assert(e.getMessage.contains("Could not find any delta files for version 0"))
assert(e.getMessage.contains("Could not find any backfilled delta files for version 0"))
}
}

Expand Down Expand Up @@ -459,10 +476,34 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar
val oldLogSegment = log.snapshot.logSegment
spark.range(10).write.format("delta").save(path)
val newLogSegment = log.snapshot.logSegment
assert(log.getLogSegmentAfterCommit(None, oldLogSegment.checkpointProvider) === newLogSegment)
assert(log.getLogSegmentAfterCommit(
log.snapshot.tableCommitOwnerClientOpt,
oldLogSegment.checkpointProvider) === newLogSegment)
spark.range(10).write.format("delta").mode("append").save(path)
assert(log.getLogSegmentAfterCommit(None, oldLogSegment.checkpointProvider)
=== log.snapshot.logSegment)
val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf())
val commitFileProvider = DeltaCommitFileProvider(log.snapshot)
intercept[IllegalArgumentException] {
val commitFile = fs.getFileStatus(commitFileProvider.deltaFile(1))
val commit = Commit(
version = 1,
fileStatus = commitFile,
commitTimestamp = 0)
// Version exists, but not contiguous with old logSegment
log.getLogSegmentAfterCommit(1, None, oldLogSegment, commit, None, EmptyCheckpointProvider)
}
intercept[IllegalArgumentException] {
val commitFile = fs.getFileStatus(commitFileProvider.deltaFile(0))
val commit = Commit(
version = 0,
fileStatus = commitFile,
commitTimestamp = 0)

// Version exists, but newLogSegment already contains it
log.getLogSegmentAfterCommit(0, None, newLogSegment, commit, None, EmptyCheckpointProvider)
}
assert(log.getLogSegmentAfterCommit(
log.snapshot.tableCommitOwnerClientOpt,
oldLogSegment.checkpointProvider) === log.snapshot.logSegment)
}
}

Expand All @@ -488,6 +529,18 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar
}
}

class SnapshotManagementWithManagedCommitBatch1Suite extends SnapshotManagementSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(1)
}

class SnapshotManagementWithManagedCommitBatch2Suite extends SnapshotManagementSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(2)
}

class SnapshotManagementWithManagedCommitBatch100Suite extends SnapshotManagementSuite {
override def managedCommitBackfillBatchSize: Option[Int] = Some(100)
}

class CountDownLatchLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
extends LocalLogStore(sparkConf, hadoopConf) {
override def listFrom(path: Path, hadoopConf: Configuration): Iterator[FileStatus] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ trait ManagedCommitTestUtils
test(s"$testName [Backfill batch size: None]") {
f(None)
}
val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue")
val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf)
withSQLConf(
DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> "in-memory",
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey -> managedCommitOwnerJson) {
testWithDifferentBackfillInterval(testName) { backfillBatchSize =>
testWithDifferentBackfillInterval(testName) { backfillBatchSize =>
val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue")
val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf)
withSQLConf(
DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> "tracking-in-memory",
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey ->
managedCommitOwnerJson) {
f(Some(backfillBatchSize))
}
}
Expand Down

0 comments on commit 3883545

Please sign in to comment.