Skip to content

Commit

Permalink
[Spark] Extend Fuzz test For Managed Commit (delta-io#3049)
Browse files Browse the repository at this point in the history
This PR extends Fuzz test to test managed commit features. Specifically,
it adds a new event phase inside commit operation, so that we can
capture the backfill as a separate operation. By doing so, it is
possible that multiple commits can go through before backfill and
managed commit is expected to deal with various situation to return the
correct output.

## How was this patch tested?
Existing fuzz tests should naturally use the extended backfill phases.
  • Loading branch information
junlee-db authored May 9, 2024
1 parent 40b8f97 commit 8a56bdc
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1427,8 +1427,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
val updatedActions = UpdatedActions(
commitInfo, metadata, protocol, snapshot.metadata, snapshot.protocol)
val commitResponse =
val commitResponse = TransactionExecutionObserver.withObserver(executionObserver) {
effectiveTableCommitOwnerClient.commit(attemptVersion, jsonActions, updatedActions)
}
// TODO(managed-commits): Use the right timestamp method on top of CommitInfo once ICT is
// merged.
// If the metadata didn't change, `newMetadata` is empty, and we can re-use the old id.
Expand Down Expand Up @@ -2094,9 +2095,12 @@ trait OptimisticTransactionImpl extends TransactionalWrite
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
// Get thread local observer for Fuzz testing purpose.
val executionObserver = TransactionExecutionObserver.threadObserver.get()
val commitFile = util.FileNames.unsafeDeltaFile(logPath, commitVersion)
val commitFileStatus =
doCommit(logStore, hadoopConf, logPath, commitFile, commitVersion, actions)
executionObserver.beginBackfill()
// TODO(managed-commits): Integrate with ICT and pass the correct commitTimestamp
CommitResponse(Commit(
commitVersion,
Expand Down Expand Up @@ -2174,7 +2178,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
): Commit = {
val updatedActions =
currentTransactionInfo.getUpdatedActions(snapshot.metadata, snapshot.protocol)
val commitResponse = tableCommitOwnerClient.commit(attemptVersion, jsonActions, updatedActions)
val commitResponse = TransactionExecutionObserver.withObserver(executionObserver) {
tableCommitOwnerClient.commit(attemptVersion, jsonActions, updatedActions)
}
// TODO(managed-commits): Use the right timestamp method on top of CommitInfo once ICT is
// merged.
val commitTimestamp = commitResponse.getCommit.getFileStatus.getModificationTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ trait TransactionExecutionObserver
/** Called before the first `doCommit` attempt. */
def beginDoCommit(): Unit

/** Called after publishing the commit file but before the `backfill` attempt. */
def beginBackfill(): Unit

/** Called once a commit succeeded. */
def transactionCommitted(): Unit

Expand Down Expand Up @@ -120,6 +123,8 @@ object NoOpTransactionExecutionObserver extends TransactionExecutionObserver {

override def beginDoCommit(): Unit = ()

override def beginBackfill(): Unit = ()

override def transactionCommitted(): Unit = ()

override def transactionAborted(): Unit = ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.delta.fuzzer
case class OptimisticTransactionPhases(
initialPhase: ExecutionPhaseLock,
preparePhase: ExecutionPhaseLock,
commitPhase: ExecutionPhaseLock)
commitPhase: ExecutionPhaseLock,
backfillPhase: ExecutionPhaseLock)

object OptimisticTransactionPhases {

Expand All @@ -28,6 +29,7 @@ object OptimisticTransactionPhases {
final val INITIAL_PHASE_LABEL = PREFIX + "INIT"
final val PREPARE_PHASE_LABEL = PREFIX + "PREPARE"
final val COMMIT_PHASE_LABEL = PREFIX + "COMMIT"
final val BACKFILL_PHASE_LABEL = PREFIX + "BACKFILL"

def forName(txnName: String): OptimisticTransactionPhases = {

Expand All @@ -37,6 +39,7 @@ object OptimisticTransactionPhases {
OptimisticTransactionPhases(
initialPhase = ExecutionPhaseLock(toTxnPhaseLabel(INITIAL_PHASE_LABEL)),
preparePhase = ExecutionPhaseLock(toTxnPhaseLabel(PREPARE_PHASE_LABEL)),
commitPhase = ExecutionPhaseLock(toTxnPhaseLabel(COMMIT_PHASE_LABEL)))
commitPhase = ExecutionPhaseLock(toTxnPhaseLabel(COMMIT_PHASE_LABEL)),
backfillPhase = ExecutionPhaseLock(toTxnPhaseLabel(BACKFILL_PHASE_LABEL)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ private[delta] class PhaseLockingTransactionExecutionObserver(
override val phaseLocks: Seq[ExecutionPhaseLock] = Seq(
phases.initialPhase,
phases.preparePhase,
phases.commitPhase)
phases.commitPhase,
phases.backfillPhase)

/**
* When set to true this observer will automatically update the thread's current observer to
Expand All @@ -42,36 +43,49 @@ private[delta] class PhaseLockingTransactionExecutionObserver(

override def preparingCommit[T](f: => T): T = phases.preparePhase.execute(f)

override def beginDoCommit(): Unit = phases.commitPhase.waitToEnter()
override def beginDoCommit(): Unit = {
phases.commitPhase.waitToEnter()
}

override def beginBackfill(): Unit = {
phases.commitPhase.leave()
phases.backfillPhase.waitToEnter()
}

override def transactionCommitted(): Unit = {
if (nextObserver.nonEmpty && autoAdvanceNextObserver) {
waitForCommitPhaseAndAdvanceToNextObserver()
} else {
phases.commitPhase.leave()
phases.backfillPhase.leave()
}
}

override def transactionAborted(): Unit = {
if (!phases.commitPhase.hasEntered) {
phases.commitPhase.waitToEnter()
if (!phases.commitPhase.hasLeft) {
if (!phases.commitPhase.hasEntered) {
phases.commitPhase.waitToEnter()
}
phases.commitPhase.leave()
}
if (!phases.backfillPhase.hasEntered) {
phases.backfillPhase.waitToEnter()
}
if (nextObserver.nonEmpty && autoAdvanceNextObserver) {
waitForCommitPhaseAndAdvanceToNextObserver()
} else {
phases.commitPhase.leave()
phases.backfillPhase.leave()
}
}

/*
* Wait for the commit phase to pass but do not unblock it so that callers can write tests
* Wait for the backfill phase to pass but do not unblock it so that callers can write tests
* that capture errors caused by code between the end of the last txn and the start of the
* new txn. After the commit phase is passed, update the thread observer of the thread to
* the next observer.
*/
def waitForCommitPhaseAndAdvanceToNextObserver(): Unit = {
require(nextObserver.nonEmpty)
phases.commitPhase.waitToLeave()
phases.backfillPhase.waitToLeave()
advanceToNextThreadObserver()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.managedcommit
import java.nio.file.FileAlreadyExistsException
import java.util.UUID

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.TransactionExecutionObserver
import org.apache.spark.sql.delta.actions.CommitInfo
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.storage.LogStore
Expand Down Expand Up @@ -62,6 +62,7 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
val executionObserver = TransactionExecutionObserver.threadObserver.get()
val tablePath = ManagedCommitUtils.getTablePath(logPath)
if (commitVersion == 0) {
throw CommitFailedException(
Expand Down Expand Up @@ -99,6 +100,7 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L

val mcToFsConversion = isManagedCommitToFSConversion(commitVersion, updatedActions)
// Backfill if needed
executionObserver.beginBackfill()
if (batchSize <= 1) {
// Always backfill when batch size is configured as 1
backfill(logStore, hadoopConf, logPath, commitVersion, fileStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,21 @@ class TransactionExecutionObserverSuite extends QueryTest with SharedSparkSessio
assert(observer.phases.initialPhase.hasLeft)
assert(!observer.phases.preparePhase.hasEntered)
assert(!observer.phases.commitPhase.hasEntered)
assert(!observer.phases.backfillPhase.hasEntered)

// allow things to progress
observer.phases.preparePhase.entryBarrier.unblock()
observer.phases.commitPhase.entryBarrier.unblock()
observer.phases.backfillPhase.entryBarrier.unblock()
val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq
txn.commit(removedFiles, DeltaOperations.ManualUpdate)

assert(observer.phases.preparePhase.hasEntered)
assert(observer.phases.preparePhase.hasLeft)
assert(observer.phases.commitPhase.hasEntered)
assert(observer.phases.commitPhase.hasLeft)
assert(observer.phases.backfillPhase.hasEntered)
assert(observer.phases.backfillPhase.hasLeft)
}
}
val res = spark.read.format("delta").load(tempPath).collect()
Expand Down Expand Up @@ -118,6 +122,10 @@ class TransactionExecutionObserverSuite extends QueryTest with SharedSparkSessio
observer.phases.commitPhase.entryBarrier.unblock()
busyWaitFor(observer.phases.commitPhase.hasEntered, timeout)
busyWaitFor(observer.phases.commitPhase.hasLeft, timeout)

observer.phases.backfillPhase.entryBarrier.unblock()
busyWaitFor(observer.phases.backfillPhase.hasEntered, timeout)
busyWaitFor(observer.phases.backfillPhase.hasLeft, timeout)
testThread.join(timeout.toMillis)
assert(!testThread.isAlive) // should have passed the barrier and completed

Expand Down Expand Up @@ -146,6 +154,7 @@ class TransactionExecutionObserverSuite extends QueryTest with SharedSparkSessio
// allow things to progress
observer.phases.preparePhase.entryBarrier.unblock()
observer.phases.commitPhase.entryBarrier.unblock()
observer.phases.backfillPhase.entryBarrier.unblock()
val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq
txn.commit(removedFiles, DeltaOperations.ManualUpdate)
}
Expand All @@ -155,6 +164,7 @@ class TransactionExecutionObserverSuite extends QueryTest with SharedSparkSessio
// allow things to progress
observer.phases.preparePhase.entryBarrier.unblock()
observer.phases.commitPhase.entryBarrier.unblock()
observer.phases.backfillPhase.entryBarrier.unblock()
val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq
txn.commit(removedFiles, DeltaOperations.ManualUpdate)
}
Expand Down Expand Up @@ -210,11 +220,14 @@ class TransactionExecutionObserverSuite extends QueryTest with SharedSparkSessio
observer.phases.preparePhase.entryBarrier.unblock()
busyWaitFor(observer.phases.preparePhase.hasLeft, timeout)
assert(!observer.phases.commitPhase.hasEntered)
assert(!observer.phases.backfillPhase.hasEntered)

assertOperationNotVisible()

observer.phases.commitPhase.entryBarrier.unblock()
busyWaitFor(observer.phases.commitPhase.hasLeft, timeout)
observer.phases.backfillPhase.entryBarrier.unblock()
busyWaitFor(observer.phases.backfillPhase.hasLeft, timeout)
testThread.join(timeout.toMillis)
assert(!testThread.isAlive) // should have passed the barrier and completed

Expand Down Expand Up @@ -243,7 +256,7 @@ class TransactionExecutionObserverSuite extends QueryTest with SharedSparkSessio

TransactionExecutionObserver.withObserver(observer) {
deltaLog.withNewTransaction { txn =>
observer.phases.commitPhase.exitBarrier.unblock()
observer.phases.backfillPhase.exitBarrier.unblock()
val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq
txn.commit(removedFiles, DeltaOperations.ManualUpdate)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ trait TransactionExecutionTestMixin {
observer.phases.initialPhase.entryBarrier.unblock()
observer.phases.preparePhase.entryBarrier.unblock()
observer.phases.commitPhase.entryBarrier.unblock()
observer.phases.backfillPhase.entryBarrier.unblock()
}

/**
Expand All @@ -145,11 +146,13 @@ trait TransactionExecutionTestMixin {

// B starts and commits
unblockAllPhases(observerB)
busyWaitFor(observerB.phases.commitPhase.hasLeft, timeout)
busyWaitFor(observerB.phases.backfillPhase.hasLeft, timeout)

// A commits
observerA.phases.commitPhase.entryBarrier.unblock()
busyWaitFor(observerA.phases.commitPhase.hasLeft, timeout)
observerA.phases.backfillPhase.entryBarrier.unblock()
busyWaitFor(observerA.phases.backfillPhase.hasLeft, timeout)
}
(usageRecords, futureA, futureB)
}
Expand Down Expand Up @@ -179,15 +182,17 @@ trait TransactionExecutionTestMixin {

// B starts and commits
unblockAllPhases(observerB)
busyWaitFor(observerB.phases.commitPhase.hasLeft, timeout)
busyWaitFor(observerB.phases.backfillPhase.hasLeft, timeout)

// C starts and commits
unblockAllPhases(observerC)
busyWaitFor(observerC.phases.commitPhase.hasLeft, timeout)
busyWaitFor(observerC.phases.backfillPhase.hasLeft, timeout)

// A commits
observerA.phases.commitPhase.entryBarrier.unblock()
busyWaitFor(observerA.phases.commitPhase.hasLeft, timeout)
observerA.phases.backfillPhase.entryBarrier.unblock()
busyWaitFor(observerA.phases.backfillPhase.hasLeft, timeout)
}
(usageRecords, futureA, futureB, futureC)
}
Expand Down
Loading

0 comments on commit 8a56bdc

Please sign in to comment.