diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 04f37ebd23c..69283320819 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -193,7 +193,9 @@ class DeltaLog private( def startTransaction( catalogTableOpt: Option[CatalogTable], snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = { - new OptimisticTransaction(this, catalogTableOpt, snapshotOpt) + TransactionExecutionObserver.threadObserver.get().startingTransaction { + new OptimisticTransaction(this, catalogTableOpt, snapshotOpt) + } } /** Legacy/compat overload that does not require catalog table information. Avoid prod use. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 7211af71e34..70bb21adc5d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -249,6 +249,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite /** Tracks if this transaction has already committed. */ protected var committed = false + /** Contains the execution instrumentation set via thread-local. No-op by default. */ + protected[delta] var executionObserver: TransactionExecutionObserver = + TransactionExecutionObserver.threadObserver.get() + /** * Stores the updated metadata (if any) that will result from this txn. * @@ -1080,7 +1084,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite val finalActions = checkForSetTransactionConflictAndDedup(actions ++ this.actions.toSeq) // Try to commit at the next version. - val preparedActions = prepareCommit(finalActions, op) + val preparedActions = + executionObserver.preparingCommit { + prepareCommit(finalActions, op) + } // Find the isolation level to use for this commit val isolationLevelToUse = getIsolationLevelToUse(preparedActions, op) @@ -1163,6 +1170,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite return None } + // Try to commit at the next version. + executionObserver.beginDoCommit() + val (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo) = doCommitRetryIteratively( firstAttemptVersion, currentTransactionInfo, isolationLevelToUse) @@ -1171,15 +1181,18 @@ trait OptimisticTransactionImpl extends TransactionalWrite } catch { case e: DeltaConcurrentModificationException => recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType) + executionObserver.transactionAborted() throw e case NonFatal(e) => recordDeltaEvent( deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e))) + executionObserver.transactionAborted() throw e } runPostCommitHooks(version, postCommitSnapshot, actualCommittedActions) + executionObserver.transactionCommitted() Some(version) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TransactionExecutionObserver.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TransactionExecutionObserver.scala new file mode 100644 index 00000000000..b6d845bc714 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TransactionExecutionObserver.scala @@ -0,0 +1,103 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +/** + * Track different stages of the execution of a transaction. + * + * This is mostly meant for test instrumentation. + * + * The default is a no-op implementation. + */ +trait TransactionExecutionObserver { + /* + * This is called outside the transaction object, + * since it wraps its creation. + */ + + /** Wraps transaction creation. */ + def startingTransaction(f: => OptimisticTransaction): OptimisticTransaction + + /* + * These are called from within the transaction object. + */ + + /** Wraps `prepareCommit`. */ + def preparingCommit[T](f: => T): T + + /* + * The next three methods before/after-style instead of wrapping like above, + * because the commit code is large and in a try-catch block, + * making wrapping impractical. + */ + + /** Called before the first `doCommit` attempt. */ + def beginDoCommit(): Unit + + /** Called once a commit succeeded. */ + def transactionCommitted(): Unit + + /** + * Called once the transaction failed. + * + * *Note:* It can happen that [[transactionAborted()]] is called + * without [[beginDoCommit()]] being called first. + * This occurs when there is an Exception thrown during the transaction's body. + */ + def transactionAborted(): Unit +} + +object TransactionExecutionObserver { + /** Thread-local observer instance loaded by [[DeltaLog]] and [[OptimisticTransaction]]. */ + val threadObserver: ThreadLocal[TransactionExecutionObserver] = + ThreadLocal.withInitial(() => NoOpTransactionExecutionObserver) + + /** + * Instrument all transactions created and completed within `thunk` with `observer`. + * + * *Note 1:* Closing over existing transactions with `thunk` will have no effect. + * *Note 2:* Do not leak transactions created within `thunk` via the return value. + * *Note 3:* Do not create threads with new transactions within `thunk`. + * The observer information is not copied to children threads automatically. + * + * If you need more flexible usage of [[TransactionExecutionObserver]] use + * `TransactionExecutionObserver.threadObserver.set()` instead. + */ + def withObserver[T](observer: TransactionExecutionObserver)(thunk: => T): T = { + val oldObserver = threadObserver.get() + threadObserver.set(observer) + try { + thunk + } finally { + // reset + threadObserver.set(oldObserver) + } + } +} + +/** Default observer does nothing. */ +object NoOpTransactionExecutionObserver extends TransactionExecutionObserver { + override def startingTransaction(f: => OptimisticTransaction): OptimisticTransaction = f + + override def preparingCommit[T](f: => T): T = f + + override def beginDoCommit(): Unit = () + + override def transactionCommitted(): Unit = () + + override def transactionAborted(): Unit = () +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/ExecutionPhaseLock.scala b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/ExecutionPhaseLock.scala new file mode 100644 index 00000000000..8f2b166bfd7 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/ExecutionPhaseLock.scala @@ -0,0 +1,68 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.fuzzer + +/** + * An ExecutionPhaseLock is an abstraction to keep multiple transactions moving in + * a pre-selected lock-step sequence. + * + * In order to pass a phase, we first wait on the `entryBarrier`. Once we are allowed to pass there, + * we can execute the code that belongs to this phase, and then we unblock the `exitBarrier`. + * + * @param name human readable name for debugging + */ +case class ExecutionPhaseLock( + name: String, + entryBarrier: AtomicBarrier = new AtomicBarrier(), + exitBarrier: AtomicBarrier = new AtomicBarrier()) { + + def hasEntered: Boolean = entryBarrier.load() == AtomicBarrier.State.Passed + + def hasLeft: Boolean = { + val current = exitBarrier.load() + current == AtomicBarrier.State.Unblocked || current == AtomicBarrier.State.Passed + } + + /** Blocks at this point until the phase has been entered. */ + def waitToEnter(): Unit = entryBarrier.waitToPass() + + /** Unblock the next dependent phase. */ + def leave(): Unit = exitBarrier.unblock() + + /** + * Wait to enter this phase, then execute `f`, and leave before returning the result of `f`. + * + * @return the result of evaluating `f` + */ + def execute[T](f: => T): T = { + waitToEnter() + try { + f + } finally { + leave() + } + } + + /** + * If there is nothing that needs to be done in this phase, + * we can leave immediately after entering. + */ + def passThrough(): Unit = { + waitToEnter() + leave() + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/OptimisticTransactionPhases.scala b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/OptimisticTransactionPhases.scala new file mode 100644 index 00000000000..2e0cb1d51bb --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/OptimisticTransactionPhases.scala @@ -0,0 +1,42 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.fuzzer + +case class OptimisticTransactionPhases( + initialPhase: ExecutionPhaseLock, + preparePhase: ExecutionPhaseLock, + commitPhase: ExecutionPhaseLock) + +object OptimisticTransactionPhases { + + private final val PREFIX = "TXN_" + + final val INITIAL_PHASE_LABEL = PREFIX + "INIT" + final val PREPARE_PHASE_LABEL = PREFIX + "PREPARE" + final val COMMIT_PHASE_LABEL = PREFIX + "COMMIT" + + def forName(txnName: String): OptimisticTransactionPhases = { + + def toTxnPhaseLabel(phaseLabel: String): String = + txnName + "-" + phaseLabel + + OptimisticTransactionPhases( + initialPhase = ExecutionPhaseLock(toTxnPhaseLabel(INITIAL_PHASE_LABEL)), + preparePhase = ExecutionPhaseLock(toTxnPhaseLabel(PREPARE_PHASE_LABEL)), + commitPhase = ExecutionPhaseLock(toTxnPhaseLabel(COMMIT_PHASE_LABEL))) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingTransactionExecutionObserver.scala b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingTransactionExecutionObserver.scala new file mode 100644 index 00000000000..bfedfc505b8 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/fuzzer/PhaseLockingTransactionExecutionObserver.scala @@ -0,0 +1,42 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.fuzzer + +import org.apache.spark.sql.delta.{OptimisticTransaction, TransactionExecutionObserver} + +private[delta] class PhaseLockingTransactionExecutionObserver( + val phases: OptimisticTransactionPhases) + extends TransactionExecutionObserver { + + override def startingTransaction(f: => OptimisticTransaction): OptimisticTransaction = + phases.initialPhase.execute(f) + + override def preparingCommit[T](f: => T): T = phases.preparePhase.execute(f) + + override def beginDoCommit(): Unit = phases.commitPhase.waitToEnter() + + override def transactionCommitted(): Unit = phases.commitPhase.leave() + + override def transactionAborted(): Unit = { + if (!phases.commitPhase.hasEntered) { + // If an Exception was thrown earlier we may not have called `beginDoCommit`, yet. + phases.commitPhase.passThrough() + } else { + phases.commitPhase.leave() + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/concurrency/TransactionExecutionObserverSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/concurrency/TransactionExecutionObserverSuite.scala new file mode 100644 index 00000000000..207c18cee90 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/concurrency/TransactionExecutionObserverSuite.scala @@ -0,0 +1,224 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.concurrency + +import scala.concurrent.duration._ + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.fuzzer.{AtomicBarrier, IllegalStateTransitionException, OptimisticTransactionPhases, PhaseLockingTransactionExecutionObserver} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import io.delta.tables.{DeltaTable => IODeltaTable} + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Check that [[TransactionExecutionObserver]] is invoked correctly by transactions + * and commands. + * + * Also check the testing tools that use this API. + */ +class TransactionExecutionObserverSuite extends QueryTest with SharedSparkSession + with DeltaSQLCommandTest + with PhaseLockingTestMixin { + + val timeout: FiniteDuration = 10000.millis + + test("Phase Locking - sequential") { + withTempDir { tempFile => + + val tempPath = tempFile.toString + + spark.range(100).write.format("delta").save(tempPath) + + val observer = new PhaseLockingTransactionExecutionObserver( + OptimisticTransactionPhases.forName("test-txn")) + val deltaLog = DeltaLog.forTable(spark, tempPath) + + // get things started + observer.phases.initialPhase.entryBarrier.unblock() + + assert(!observer.phases.initialPhase.hasEntered) + TransactionExecutionObserver.withObserver(observer) { + deltaLog.withNewTransaction { txn => + assert(observer.phases.initialPhase.hasEntered) + assert(observer.phases.initialPhase.hasLeft) + assert(!observer.phases.preparePhase.hasEntered) + assert(!observer.phases.commitPhase.hasEntered) + + // allow things to progress + observer.phases.preparePhase.entryBarrier.unblock() + observer.phases.commitPhase.entryBarrier.unblock() + val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq + txn.commit(removedFiles, DeltaOperations.ManualUpdate) + + assert(observer.phases.preparePhase.hasEntered) + assert(observer.phases.preparePhase.hasLeft) + assert(observer.phases.commitPhase.hasEntered) + assert(observer.phases.commitPhase.hasLeft) + } + } + val res = spark.read.format("delta").load(tempPath).collect() + assert(res.isEmpty) + } + } + + test("Phase Locking - parallel") { + withTempDir { tempFile => + + val tempPath = tempFile.toString + + spark.range(100).write.format("delta").save(tempPath) + + val observer = new PhaseLockingTransactionExecutionObserver( + OptimisticTransactionPhases.forName("test-txn")) + val deltaLog = DeltaLog.forTable(spark, tempPath) + + val testThread = new Thread(() => { + // make sure the transaction will use our observer + TransactionExecutionObserver.withObserver(observer) { + deltaLog.withNewTransaction { txn => + val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq + txn.commit(removedFiles, DeltaOperations.ManualUpdate) + } + } + }) + testThread.start() + + busyWaitForState( + observer.phases.initialPhase.entryBarrier, AtomicBarrier.State.Requested, timeout) + + // get things started + observer.phases.initialPhase.entryBarrier.unblock() + + busyWaitFor(observer.phases.initialPhase.hasEntered, timeout) + busyWaitFor(observer.phases.initialPhase.hasLeft, timeout) + assert(!observer.phases.preparePhase.hasEntered) + + observer.phases.preparePhase.entryBarrier.unblock() + busyWaitFor(observer.phases.preparePhase.hasEntered, timeout) + busyWaitFor(observer.phases.preparePhase.hasLeft, timeout) + assert(!observer.phases.commitPhase.hasEntered) + + observer.phases.commitPhase.entryBarrier.unblock() + busyWaitFor(observer.phases.commitPhase.hasEntered, timeout) + busyWaitFor(observer.phases.commitPhase.hasLeft, timeout) + testThread.join(timeout.toMillis) + assert(!testThread.isAlive) // should have passed the barrier and completed + + val res = spark.read.format("delta").load(tempPath).collect() + assert(res.isEmpty) + } + } + + test("Phase Locking - no reusing observer") { + withTempDir { tempFile => + + val tempPath = tempFile.toString + + spark.range(100).write.format("delta").save(tempPath) + + val observer = new PhaseLockingTransactionExecutionObserver( + OptimisticTransactionPhases.forName("test-txn")) + val deltaLog = DeltaLog.forTable(spark, tempPath) + + // get things started + observer.phases.initialPhase.entryBarrier.unblock() + + assert(!observer.phases.initialPhase.hasEntered) + TransactionExecutionObserver.withObserver(observer) { + deltaLog.withNewTransaction { txn => + // allow things to progress + observer.phases.preparePhase.entryBarrier.unblock() + observer.phases.commitPhase.entryBarrier.unblock() + val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq + txn.commit(removedFiles, DeltaOperations.ManualUpdate) + } + // Check that we fail trying to re-unblock the barrier + assertThrows[IllegalStateTransitionException] { + deltaLog.withNewTransaction { txn => + // allow things to progress + observer.phases.preparePhase.entryBarrier.unblock() + observer.phases.commitPhase.entryBarrier.unblock() + val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq + txn.commit(removedFiles, DeltaOperations.ManualUpdate) + } + } + // Check that we fail just waiting on the passed barrier + assertThrows[IllegalStateTransitionException] { + deltaLog.withNewTransaction { txn => + val removedFiles = txn.snapshot.allFiles.collect().map(_.remove).toSeq + txn.commit(removedFiles, DeltaOperations.ManualUpdate) + } + } + } + val res = spark.read.format("delta").load(tempPath).collect() + assert(res.isEmpty) + } + } + + test("Phase Locking - delete command") { + withTempDir { tempFile => + + val tempPath = tempFile.toString + + spark.range(100).write.format("delta").save(tempPath) + + val observer = new PhaseLockingTransactionExecutionObserver( + OptimisticTransactionPhases.forName("test-txn")) + val deltaLog = DeltaLog.forTable(spark, tempPath) + val deltaTable = IODeltaTable.forPath(spark, tempPath) + + def assertOperationNotVisible(): Unit = + assert(deltaTable.toDF.count() === 100) + + val testThread = new Thread(() => { + // make sure the transaction will use our observer + TransactionExecutionObserver.withObserver(observer) { + deltaTable.delete() + } + }) + testThread.start() + + busyWaitForState( + observer.phases.initialPhase.entryBarrier, AtomicBarrier.State.Requested, timeout) + + assertOperationNotVisible() + + // get things started + observer.phases.initialPhase.entryBarrier.unblock() + + busyWaitFor(observer.phases.initialPhase.hasLeft, timeout) + + assertOperationNotVisible() + + observer.phases.preparePhase.entryBarrier.unblock() + busyWaitFor(observer.phases.preparePhase.hasLeft, timeout) + assert(!observer.phases.commitPhase.hasEntered) + + assertOperationNotVisible() + + observer.phases.commitPhase.entryBarrier.unblock() + busyWaitFor(observer.phases.commitPhase.hasLeft, timeout) + testThread.join(timeout.toMillis) + assert(!testThread.isAlive) // should have passed the barrier and completed + + val res = spark.read.format("delta").load(tempPath).collect() + assert(res.isEmpty) + } + } +}