Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 21, 2024
1 parent b88e270 commit 55f848a
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 101 deletions.
174 changes: 103 additions & 71 deletions spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ trait SnapshotManagement { self: DeltaLog =>

@volatile private[delta] var asyncUpdateTask: Future[Unit] = _

/** Use ReentrantLock to allow us to call `lockInterruptibly` */
protected val snapshotLock = new ReentrantLock()

/**
* Cached fileStatus for the latest CRC file seen in the deltaLog.
*/
@volatile protected var lastSeenChecksumFileStatusOpt: Option[FileStatus] = None
@volatile protected var currentSnapshot: CapturedSnapshot = getSnapshotAtInit

/** Use ReentrantLock to allow us to call `lockInterruptibly` */
protected val snapshotLock = new ReentrantLock()

/**
* Run `body` inside `snapshotLock` lock using `lockInterruptibly` so that the thread
* can be interrupted when waiting for the lock.
Expand All @@ -81,20 +81,6 @@ trait SnapshotManagement { self: DeltaLog =>
}
}

/**
* Get the LogSegment that will help in computing the Snapshot of the table at DeltaLog
* initialization, or None if the directory was empty/missing.
*
* @param startingCheckpoint A checkpoint that we can start our listing from
*/
protected def getLogSegmentFrom(
startingCheckpoint: Option[LastCheckpointInfo]): Option[LogSegment] = {
getLogSegmentForVersion(
versionToLoad = None,
lastCheckpointInfo = startingCheckpoint
)
}

/** Get an iterator of files in the _delta_log directory starting with the startVersion. */
private[delta] def listFrom(startVersion: Long): Iterator[FileStatus] = {
store.listFrom(listingPrefix(logPath, startVersion), newDeltaHadoopConf())
Expand Down Expand Up @@ -243,6 +229,7 @@ trait SnapshotManagement { self: DeltaLog =>
versionToLoad.forall(maxDeltaVersionSeen < _) && areDeltaFilesMissing
val additionalLogTuplesFromFsListingOpt =
Option.when(needAdditionalFsListing) {
recordDeltaEvent(this, "delta.listDeltaAndCheckpointFiles.needAdditionalFsListing")
listDeltaCompactedDeltaAndCheckpointFilesOpt(
startVersionOverride = maxDeltaVersionSeen + 1)
}.flatten
Expand Down Expand Up @@ -287,11 +274,11 @@ trait SnapshotManagement { self: DeltaLog =>
* @return Some LogSegment to build a Snapshot if files do exist after the given
* startCheckpoint. None, if the directory was missing or empty.
*/
protected def getLogSegmentForVersion(
protected def createLogSegment(
versionToLoad: Option[Long] = None,
oldCheckpointProviderOpt: Option[UninitializedCheckpointProvider] = None,
lastCheckpointInfo: Option[LastCheckpointInfo] = None,
commitStoreOpt: Option[CommitStore] = None): Option[LogSegment] = {
commitStoreOpt: Option[CommitStore] = None,
lastCheckpointInfo: Option[LastCheckpointInfo] = None): Option[LogSegment] = {
// List based on the last known checkpoint version.
// if that is -1, list from version 0L
val lastCheckpointVersion = getCheckpointVersion(lastCheckpointInfo, oldCheckpointProviderOpt)
Expand All @@ -309,6 +296,12 @@ trait SnapshotManagement { self: DeltaLog =>
)
}

private def createLogSegment(previousSnapshot: Snapshot): Option[LogSegment] = {
createLogSegment(
oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider),
commitStoreOpt = previousSnapshot.commitStoreOpt)
}

/**
* Returns the last known checkpoint version based on [[LastCheckpointInfo]] or
* [[CheckpointProvider]].
Expand Down Expand Up @@ -380,7 +373,7 @@ trait SnapshotManagement { self: DeltaLog =>
// deleting files. Either way, we can't safely continue.
//
// For now, we preserve existing behavior by returning Array.empty, which will trigger a
// recursive call to [[getLogSegmentForVersion]] below.
// recursive call to [[createLogSegment]] below.
Array.empty[FileStatus]
}

Expand All @@ -391,7 +384,7 @@ trait SnapshotManagement { self: DeltaLog =>
} else if (newFiles.isEmpty) {
// The directory may be deleted and recreated and we may have stale state in our DeltaLog
// singleton, so try listing from the first version
return getLogSegmentForVersion(versionToLoad = versionToLoad)
return createLogSegment(versionToLoad = versionToLoad)
}
val (checkpoints, deltasAndCompactedDeltas) = newFiles.partition(isCheckpointFile)
val (deltas, compactedDeltas) = deltasAndCompactedDeltas.partition(isDeltaFile)
Expand Down Expand Up @@ -554,30 +547,20 @@ trait SnapshotManagement { self: DeltaLog =>
* file as a hint on where to start listing the transaction log directory. If the _delta_log
* directory doesn't exist, this method will return an `InitialSnapshot`.
*/
protected def getSnapshotAtInit: CapturedSnapshot = {
protected def getSnapshotAtInit: CapturedSnapshot = withSnapshotLockInterruptibly {
recordFrameProfile("Delta", "SnapshotManagement.getSnapshotAtInit") {
val currentTimestamp = clock.getTimeMillis()
val snapshotInitWallclockTime = clock.getTimeMillis()
val lastCheckpointOpt = readLastCheckpointFile()
createSnapshotAtInitInternal(
initSegment = getLogSegmentFrom(lastCheckpointOpt),
timestamp = currentTimestamp
)
}
}

protected def createSnapshotAtInitInternal(
initSegment: Option[LogSegment],
timestamp: Long): CapturedSnapshot = {
val snapshot = initSegment.map { segment =>
val snapshot = createSnapshot(
initSegment = segment,
checksumOpt = None)
snapshot
}.getOrElse {
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
new InitialSnapshot(logPath, this)
val initialSegmentForNewSnapshot = createLogSegment(
versionToLoad = None,
lastCheckpointInfo = lastCheckpointOpt)
val snapshot = getUpdatedSnapshot(
oldSnapshotOpt = None,
initialSegmentForNewSnapshot = initialSegmentForNewSnapshot,
initialCommitStore = None,
isAsync = false)
CapturedSnapshot(snapshot, snapshotInitWallclockTime)
}
CapturedSnapshot(snapshot, timestamp)
}

/**
Expand Down Expand Up @@ -752,7 +735,7 @@ trait SnapshotManagement { self: DeltaLog =>
* Instead, just do a general update to the latest available version. The racing commits
* can then use the version check short-circuit to avoid constructing a new snapshot.
*/
getLogSegmentForVersion(
createLogSegment(
oldCheckpointProviderOpt = Some(oldCheckpointProvider),
commitStoreOpt = commitStoreOpt
).getOrElse {
Expand Down Expand Up @@ -963,48 +946,95 @@ trait SnapshotManagement { self: DeltaLog =>
*/
protected def updateInternal(isAsync: Boolean): Snapshot =
recordDeltaOperation(this, "delta.log.update", Map(TAG_ASYNC -> isAsync.toString)) {
val updateTimestamp = clock.getTimeMillis()
val updateStartTimeMs = clock.getTimeMillis()
val previousSnapshot = currentSnapshot.snapshot
val segmentOpt = getLogSegmentForVersion(
oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider),
commitStoreOpt = previousSnapshot.commitStoreOpt)
installLogSegmentInternal(previousSnapshot, segmentOpt, updateTimestamp, isAsync)
val segmentOpt = createLogSegment(previousSnapshot)
val newSnapshot = getUpdatedSnapshot(
oldSnapshotOpt = Some(previousSnapshot),
initialSegmentForNewSnapshot = segmentOpt,
initialCommitStore = previousSnapshot.commitStoreOpt,
isAsync = isAsync)
installSnapshot(newSnapshot, updateStartTimeMs)
}

/**
* Updates and installs a new snapshot in the `currentSnapshot`.
* This method takes care of recursively creating new snapshots if the commit store has changed.
* @param oldSnapshotOpt The previous snapshot, if any.
* @param initialSegmentForNewSnapshot the log segment constructed for the new snapshot
* @param initialCommitStore the Commit Store used for constructing the
* `initialSegmentForNewSnapshot`
* @param isAsync Whether the update is async.
* @return The new snapshot.
*/
protected def getUpdatedSnapshot(
oldSnapshotOpt: Option[Snapshot],
initialSegmentForNewSnapshot: Option[LogSegment],
initialCommitStore: Option[CommitStore],
isAsync: Boolean): Snapshot = {
var commitStoreUsed = initialCommitStore
var newSnapshot = getSnapshotForLogSegmentInternal(
oldSnapshotOpt,
initialSegmentForNewSnapshot,
isAsync
)
// If the commit store has changed, we need to again invoke updateSnapshot so that we
// could get the latest commits from the new commit store. We need to do it only once as
// the delta spec mandates the commit which changes the commit owner to be backfilled.
if (newSnapshot.version >= 0 && newSnapshot.commitStoreOpt != commitStoreUsed) {
commitStoreUsed = newSnapshot.commitStoreOpt
val segmentOpt = createLogSegment(newSnapshot)
newSnapshot = getSnapshotForLogSegmentInternal(Some(newSnapshot), segmentOpt, isAsync)
}
newSnapshot
}

/** Install the provided segmentOpt as the currentSnapshot on the cluster */
protected def installLogSegmentInternal(
previousSnapshot: Snapshot,
/** Creates a Snapshot for the given `segmentOpt` */
protected def getSnapshotForLogSegmentInternal(
previousSnapshotOpt: Option[Snapshot],
segmentOpt: Option[LogSegment],
updateTimestamp: Long,
isAsync: Boolean): Snapshot = {
segmentOpt.map { segment =>
if (segment == previousSnapshot.logSegment) {
// If no changes were detected, just refresh the timestamp
val timestampToUse = math.max(updateTimestamp, currentSnapshot.updateTimestamp)
currentSnapshot = currentSnapshot.copy(updateTimestamp = timestampToUse)
if (previousSnapshotOpt.exists(_.logSegment == segment)) {
previousSnapshotOpt.get
} else {
val newSnapshot = createSnapshot(
initSegment = segment,
checksumOpt = None)
logMetadataTableIdChange(previousSnapshot, newSnapshot)
previousSnapshotOpt.foreach(logMetadataTableIdChange(_, newSnapshot))
logInfo(s"Updated snapshot to $newSnapshot")
replaceSnapshot(newSnapshot, updateTimestamp)
newSnapshot
}
}.getOrElse {
logInfo(s"No delta log found for the Delta table at $logPath")
replaceSnapshot(new InitialSnapshot(logPath, this), updateTimestamp)
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
new InitialSnapshot(logPath, this)
}
currentSnapshot.snapshot
}

/** Replace the given snapshot with the provided one. */
protected def replaceSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Unit = {
/** Installs the given `newSnapshot` as the `currentSnapshot` */
protected def installSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Snapshot = {
if (!snapshotLock.isHeldByCurrentThread) {
if (Utils.isTesting) {
throw new RuntimeException("DeltaLog snapshot replaced without taking lock")
}
recordDeltaEvent(this, "delta.update.unsafeReplace")
}
val oldSnapshot = currentSnapshot.snapshot
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
oldSnapshot.uncache()
if (currentSnapshot == null) {
// cold snapshot initialization
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
return newSnapshot
}
val CapturedSnapshot(oldSnapshot, oldTimestamp) = currentSnapshot
if (oldSnapshot eq newSnapshot) {
// Same snapshot as before, so just refresh the timestamp
val timestampToUse = math.max(updateTimestamp, oldTimestamp)
currentSnapshot = CapturedSnapshot(newSnapshot, timestampToUse)
} else {
// Install the new snapshot and uncache the old one
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
oldSnapshot.uncache()
}
newSnapshot
}

/** Log a change in the metadata's table id whenever we install a newer version of a snapshot */
Expand Down Expand Up @@ -1078,8 +1108,7 @@ trait SnapshotManagement { self: DeltaLog =>
committedVersion)
logMetadataTableIdChange(previousSnapshot, newSnapshot)
logInfo(s"Updated snapshot to $newSnapshot")
replaceSnapshot(newSnapshot, updateTimestamp)
currentSnapshot.snapshot
installSnapshot(newSnapshot, updateTimestamp)
}
}

Expand All @@ -1101,7 +1130,7 @@ trait SnapshotManagement { self: DeltaLog =>
// fallback to the other overload.
return getSnapshotAt(version)
}
val segment = getLogSegmentForVersion(
val segment = createLogSegment(
versionToLoad = Some(version),
oldCheckpointProviderOpt = Some(lastCheckpointProvider)
).getOrElse {
Expand Down Expand Up @@ -1129,7 +1158,7 @@ trait SnapshotManagement { self: DeltaLog =>
.collect { case ci if ci.version <= version => ci }
.orElse(findLastCompleteCheckpointBefore(version))
.map(manuallyLoadCheckpoint)
getLogSegmentForVersion(
createLogSegment(
versionToLoad = Some(version),
lastCheckpointInfo = lastCheckpointInfoHint,
commitStoreOpt = current.commitStoreOpt
Expand All @@ -1142,6 +1171,9 @@ trait SnapshotManagement { self: DeltaLog =>
throw DeltaErrors.emptyDirectoryException(logPath.toString)
}
}

// Visible for testing
private[delta] def getCapturedSnapshot(): CapturedSnapshot = currentSnapshot
}

object SnapshotManagement {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {
* Commit a given `commitFile` to the table represented by given `logPath` at the
* given `commitVersion`
*/
protected def commitImpl(
private[delta] def commitImpl(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import org.apache.hadoop.fs.{FileStatus, Path}

class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingCommitStore {

private[managedcommit] class PerTableData {
private[managedcommit] class PerTableData(var maxCommitVersion: Long = -1) {
// Map from version to Commit data
val commitsMap: mutable.SortedMap[Long, Commit] = mutable.SortedMap.empty
// We maintain maxCommitVersion explicitly since commitsMap might be empty
// if all commits for a table have been backfilled.
var maxCommitVersion: Long = -1
val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
}

Expand Down Expand Up @@ -71,7 +70,7 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
* @throws CommitFailedException if the commit version is not the expected next version,
* indicating a version conflict.
*/
protected def commitImpl(
private[delta] def commitImpl(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
Expand Down Expand Up @@ -126,6 +125,15 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
versionsToRemove.foreach(tableData.commitsMap.remove)
}
}

def registerTable(
logPath: Path,
maxCommitVersion: Long): Unit = {
val newPerTableData = new PerTableData(maxCommitVersion)
if (perTableMap.putIfAbsent(logPath, newPerTableData) != null) {
throw new IllegalStateException(s"Table $logPath already exists in the commit store.")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,13 +936,6 @@ class CheckpointsSuite
assert(filterUsageRecords(usageRecords2, "delta.log.cleanup").size > 0)
}
}

protected def filterUsageRecords(
usageRecords: Seq[UsageRecord], opType: String): Seq[UsageRecord] = {
usageRecords.filter { r =>
r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.concurrent
import scala.reflect.ClassTag
import scala.util.matching.Regex

import com.databricks.spark.util.UsageRecord
import org.apache.spark.sql.delta.DeltaTestUtils.Plans
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
Expand Down Expand Up @@ -155,6 +156,13 @@ trait DeltaTestUtilsBase {
jobs.values.count(_ > 0)
}

/** Filter `usageRecords` by the `opType` tag or field. */
def filterUsageRecords(usageRecords: Seq[UsageRecord], opType: String): Seq[UsageRecord] = {
usageRecords.filter { r =>
r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType)
}
}

protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = {
// The expected plan for touched file computation is of the format below.
// The data column should be pruned from both leaves.
Expand Down
Loading

0 comments on commit 55f848a

Please sign in to comment.