From cb8e0cf23a5e7d00f3fd20a1e8df390aa2487658 Mon Sep 17 00:00:00 2001 From: Thang Long Vu <107926660+longvu-db@users.noreply.github.com> Date: Tue, 30 Apr 2024 20:31:57 +0200 Subject: [PATCH] [Spark] Add nextObserver and the PhaseLockingExecutionObserver for the Concurrency Testing Framework (#2932) #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Add the `nextObserver` and the `PhaseLockingExecutionObserver` for the Concurrency Testing Framework, as we will need it to write Row Tracking Backfill conflicts with other commands tests when we add Row Tracking Backfill. ## How was this patch tested? Added UTs. ## Does this PR introduce _any_ user-facing changes? No. --- .../delta/TransactionExecutionObserver.scala | 25 +++++++- .../sql/delta/fuzzer/ExecutionPhaseLock.scala | 3 + .../PhaseLockingExecutionObserver.scala | 29 +++++++++ ...eLockingTransactionExecutionObserver.scala | 61 +++++++++++++++++-- .../TransactionExecutionObserverSuite.scala | 52 +++++++++++++++- 5 files changed, 163 insertions(+), 7 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingExecutionObserver.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TransactionExecutionObserver.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TransactionExecutionObserver.scala index b6d845bc714..c3bc74382d1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TransactionExecutionObserver.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TransactionExecutionObserver.scala @@ -16,6 +16,23 @@ package org.apache.spark.sql.delta +trait ChainableExecutionObserver[O] { + /** + * The next txn observer for this thread. + * The next observer is used to test threads that perform multiple transactions, i.e. + * commands that perform multiple commits. + */ + @volatile protected var nextObserver: Option[O] = None + + /** Set the next observer for this thread. */ + def setNextObserver(nextTxnObserver: O): Unit = { + nextObserver = Some(nextTxnObserver) + } + + /** Update the observer of this thread with the next observer. */ + def advanceToNextThreadObserver(): Unit +} + /** * Track different stages of the execution of a transaction. * @@ -23,7 +40,8 @@ package org.apache.spark.sql.delta * * The default is a no-op implementation. */ -trait TransactionExecutionObserver { +trait TransactionExecutionObserver + extends ChainableExecutionObserver[TransactionExecutionObserver] { /* * This is called outside the transaction object, * since it wraps its creation. @@ -59,6 +77,11 @@ trait TransactionExecutionObserver { * This occurs when there is an Exception thrown during the transaction's body. */ def transactionAborted(): Unit + + override def advanceToNextThreadObserver(): Unit = { + TransactionExecutionObserver.threadObserver.set( + nextObserver.getOrElse(NoOpTransactionExecutionObserver)) + } } object TransactionExecutionObserver { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/ExecutionPhaseLock.scala b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/ExecutionPhaseLock.scala index 8f2b166bfd7..fb607ecbf4f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/ExecutionPhaseLock.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/ExecutionPhaseLock.scala @@ -65,4 +65,7 @@ case class ExecutionPhaseLock( waitToEnter() leave() } + + /** Blocks at this point until the phase has been left. */ + def waitToLeave(): Unit = exitBarrier.waitToPass() } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingExecutionObserver.scala b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingExecutionObserver.scala new file mode 100644 index 00000000000..90db165fcdd --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingExecutionObserver.scala @@ -0,0 +1,29 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.fuzzer + +/** + * Trait representing execution observers that rely on phases with entry and exit barriers to + * control the order of execution of the observed code paths. See [[ExecutionPhaseLock]]. + */ +trait PhaseLockingExecutionObserver { + + val phaseLocks: Seq[ExecutionPhaseLock] + + /** Return `true` if we have left all phases, `false` otherwise. */ + def allPhasesHavePassed: Boolean = phaseLocks.forall(_.hasLeft) +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingTransactionExecutionObserver.scala b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingTransactionExecutionObserver.scala index bfedfc505b8..f93b843b92b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingTransactionExecutionObserver.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingTransactionExecutionObserver.scala @@ -20,7 +20,22 @@ import org.apache.spark.sql.delta.{OptimisticTransaction, TransactionExecutionOb private[delta] class PhaseLockingTransactionExecutionObserver( val phases: OptimisticTransactionPhases) - extends TransactionExecutionObserver { + extends TransactionExecutionObserver + with PhaseLockingExecutionObserver { + + override val phaseLocks: Seq[ExecutionPhaseLock] = Seq( + phases.initialPhase, + phases.preparePhase, + phases.commitPhase) + + /** + * When set to true this observer will automatically update the thread's current observer to + * the next one. Also, it will not unblock the exit barrier of the commit phase automatically. + * Instead, the caller will have to automatically unblock it. This allows writing tests that + * can capture errors caused by code written between the end of the last txn and the start of + * the next txn. + */ + @volatile protected var autoAdvanceNextObserver: Boolean = false override def startingTransaction(f: => OptimisticTransaction): OptimisticTransaction = phases.initialPhase.execute(f) @@ -29,14 +44,52 @@ private[delta] class PhaseLockingTransactionExecutionObserver( override def beginDoCommit(): Unit = phases.commitPhase.waitToEnter() - override def transactionCommitted(): Unit = phases.commitPhase.leave() + override def transactionCommitted(): Unit = { + if (nextObserver.nonEmpty && autoAdvanceNextObserver) { + waitForCommitPhaseAndAdvanceToNextObserver() + } else { + phases.commitPhase.leave() + } + } override def transactionAborted(): Unit = { if (!phases.commitPhase.hasEntered) { - // If an Exception was thrown earlier we may not have called `beginDoCommit`, yet. - phases.commitPhase.passThrough() + phases.commitPhase.waitToEnter() + } + if (nextObserver.nonEmpty && autoAdvanceNextObserver) { + waitForCommitPhaseAndAdvanceToNextObserver() } else { phases.commitPhase.leave() } } + + /* + * Wait for the commit phase to pass but do not unblock it so that callers can write tests + * that capture errors caused by code between the end of the last txn and the start of the + * new txn. After the commit phase is passed, update the thread observer of the thread to + * the next observer. + */ + def waitForCommitPhaseAndAdvanceToNextObserver(): Unit = { + require(nextObserver.nonEmpty) + phases.commitPhase.waitToLeave() + advanceToNextThreadObserver() + } + + /** + * Set the next observer, which will replace the txn observer on the thread after a successful + * commit. This method only works as expected if we haven't entered the commit phase yet. + * + * Note that when a next observer is set, the caller needs to manually unblock the exit barrier + * of the commit phase. + * + * For example, see [[waitForCommitPhaseAndAdvanceToNextObserver]]. + */ + def setNextObserver( + nextTxnObserver: TransactionExecutionObserver, + autoAdvance: Boolean): Unit = { + setNextObserver(nextTxnObserver) + autoAdvanceNextObserver = autoAdvance + } + + override def advanceToNextThreadObserver(): Unit = super.advanceToNextThreadObserver() } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/concurrency/TransactionExecutionObserverSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/concurrency/TransactionExecutionObserverSuite.scala index 207c18cee90..423a26e7707 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/concurrency/TransactionExecutionObserverSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/concurrency/TransactionExecutionObserverSuite.scala @@ -34,9 +34,10 @@ import org.apache.spark.sql.test.SharedSparkSession */ class TransactionExecutionObserverSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest - with PhaseLockingTestMixin { + with PhaseLockingTestMixin + with TransactionExecutionTestMixin { - val timeout: FiniteDuration = 10000.millis + override val timeout: FiniteDuration = 10000.millis test("Phase Locking - sequential") { withTempDir { tempFile => @@ -221,4 +222,51 @@ class TransactionExecutionObserverSuite extends QueryTest with SharedSparkSessio assert(res.isEmpty) } } + + test("Phase Locking - set next observer after commit") { + withTempDir { tempFile => + val tempPath = tempFile.toString + + spark.range(end = 1).write.format("delta").save(tempPath) + + val observer = new PhaseLockingTransactionExecutionObserver( + OptimisticTransactionPhases.forName("test-txn")) + val deltaLog = DeltaLog.forTable(spark, tempPath) + val initialTableVersion = deltaLog.update().version + + // get things started + val replacementObserver = new PhaseLockingTransactionExecutionObserver( + OptimisticTransactionPhases.forName("test-replacement-txn")) + + observer.setNextObserver(replacementObserver, autoAdvance = true) + unblockAllPhases(observer) + + TransactionExecutionObserver.withObserver(observer) { + deltaLog.withNewTransaction { txn => + observer.phases.commitPhase.exitBarrier.unblock() + val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq + txn.commit(removedFiles, DeltaOperations.ManualUpdate) + } + val tableVersionAfterFirstTxn = deltaLog.update().version + assert(tableVersionAfterFirstTxn === initialTableVersion + 1, + "expected a successful commit") + // Check that we cannot re-use the old observer, with unblocks. + assertThrows[IllegalStateTransitionException] { + observer.phases.preparePhase.entryBarrier.unblock() + } + + // Check that we can use the replaced observer to control a subsequent commit on the same + // thread. + val oldMetadata = deltaLog.update().metadata + val newMetadata = oldMetadata.copy(configuration = Map("foo" -> "bar")) + unblockAllPhases(replacementObserver) + deltaLog.withNewTransaction { txn => + txn.commit(Seq(newMetadata), DeltaOperations.ManualUpdate) + } + assert(deltaLog.update().version === tableVersionAfterFirstTxn + 1, + "expected a successful commit") + assert(replacementObserver.allPhasesHavePassed) + } + } + } }