Skip to content

Commit

Permalink
[Spark] Add TransactionExecutionObserver (delta-io#2816)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

1. Add an observer field to `OptimisticTransaction` which can can be set
via a thread-local to a custom instrumentation class. Instrumentation
methods are invoked when creating a new transaction, for
`prepareCommit`, for `doCommit`, and for completion of a transaction
(failure or successful).
2. The default observer simply performs no-ops.
3. Added a testing `PhaseLocking` observer implementation that allows
both observing, but also blocking the transaction's thread until
unblocked by another thread. This allows fine control of how the
transaction progresses, which is needed for some testing scenarios.

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
Added UTs.
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
longvu-db authored Apr 2, 2024
1 parent c88db07 commit 01c0ef9
Show file tree
Hide file tree
Showing 7 changed files with 496 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ class DeltaLog private(
def startTransaction(
catalogTableOpt: Option[CatalogTable],
snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = {
new OptimisticTransaction(this, catalogTableOpt, snapshotOpt)
TransactionExecutionObserver.threadObserver.get().startingTransaction {
new OptimisticTransaction(this, catalogTableOpt, snapshotOpt)
}
}

/** Legacy/compat overload that does not require catalog table information. Avoid prod use. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
/** Tracks if this transaction has already committed. */
protected var committed = false

/** Contains the execution instrumentation set via thread-local. No-op by default. */
protected[delta] var executionObserver: TransactionExecutionObserver =
TransactionExecutionObserver.threadObserver.get()

/**
* Stores the updated metadata (if any) that will result from this txn.
*
Expand Down Expand Up @@ -1080,7 +1084,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val finalActions = checkForSetTransactionConflictAndDedup(actions ++ this.actions.toSeq)

// Try to commit at the next version.
val preparedActions = prepareCommit(finalActions, op)
val preparedActions =
executionObserver.preparingCommit {
prepareCommit(finalActions, op)
}

// Find the isolation level to use for this commit
val isolationLevelToUse = getIsolationLevelToUse(preparedActions, op)
Expand Down Expand Up @@ -1163,6 +1170,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
return None
}

// Try to commit at the next version.
executionObserver.beginDoCommit()

val (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo) =
doCommitRetryIteratively(
firstAttemptVersion, currentTransactionInfo, isolationLevelToUse)
Expand All @@ -1171,15 +1181,18 @@ trait OptimisticTransactionImpl extends TransactionalWrite
} catch {
case e: DeltaConcurrentModificationException =>
recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType)
executionObserver.transactionAborted()
throw e
case NonFatal(e) =>
recordDeltaEvent(
deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))
executionObserver.transactionAborted()
throw e
}

runPostCommitHooks(version, postCommitSnapshot, actualCommittedActions)

executionObserver.transactionCommitted()
Some(version)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

/**
* Track different stages of the execution of a transaction.
*
* This is mostly meant for test instrumentation.
*
* The default is a no-op implementation.
*/
trait TransactionExecutionObserver {
/*
* This is called outside the transaction object,
* since it wraps its creation.
*/

/** Wraps transaction creation. */
def startingTransaction(f: => OptimisticTransaction): OptimisticTransaction

/*
* These are called from within the transaction object.
*/

/** Wraps `prepareCommit`. */
def preparingCommit[T](f: => T): T

/*
* The next three methods before/after-style instead of wrapping like above,
* because the commit code is large and in a try-catch block,
* making wrapping impractical.
*/

/** Called before the first `doCommit` attempt. */
def beginDoCommit(): Unit

/** Called once a commit succeeded. */
def transactionCommitted(): Unit

/**
* Called once the transaction failed.
*
* *Note:* It can happen that [[transactionAborted()]] is called
* without [[beginDoCommit()]] being called first.
* This occurs when there is an Exception thrown during the transaction's body.
*/
def transactionAborted(): Unit
}

object TransactionExecutionObserver {
/** Thread-local observer instance loaded by [[DeltaLog]] and [[OptimisticTransaction]]. */
val threadObserver: ThreadLocal[TransactionExecutionObserver] =
ThreadLocal.withInitial(() => NoOpTransactionExecutionObserver)

/**
* Instrument all transactions created and completed within `thunk` with `observer`.
*
* *Note 1:* Closing over existing transactions with `thunk` will have no effect.
* *Note 2:* Do not leak transactions created within `thunk` via the return value.
* *Note 3:* Do not create threads with new transactions within `thunk`.
* The observer information is not copied to children threads automatically.
*
* If you need more flexible usage of [[TransactionExecutionObserver]] use
* `TransactionExecutionObserver.threadObserver.set()` instead.
*/
def withObserver[T](observer: TransactionExecutionObserver)(thunk: => T): T = {
val oldObserver = threadObserver.get()
threadObserver.set(observer)
try {
thunk
} finally {
// reset
threadObserver.set(oldObserver)
}
}
}

/** Default observer does nothing. */
object NoOpTransactionExecutionObserver extends TransactionExecutionObserver {
override def startingTransaction(f: => OptimisticTransaction): OptimisticTransaction = f

override def preparingCommit[T](f: => T): T = f

override def beginDoCommit(): Unit = ()

override def transactionCommitted(): Unit = ()

override def transactionAborted(): Unit = ()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.fuzzer

/**
* An ExecutionPhaseLock is an abstraction to keep multiple transactions moving in
* a pre-selected lock-step sequence.
*
* In order to pass a phase, we first wait on the `entryBarrier`. Once we are allowed to pass there,
* we can execute the code that belongs to this phase, and then we unblock the `exitBarrier`.
*
* @param name human readable name for debugging
*/
case class ExecutionPhaseLock(
name: String,
entryBarrier: AtomicBarrier = new AtomicBarrier(),
exitBarrier: AtomicBarrier = new AtomicBarrier()) {

def hasEntered: Boolean = entryBarrier.load() == AtomicBarrier.State.Passed

def hasLeft: Boolean = {
val current = exitBarrier.load()
current == AtomicBarrier.State.Unblocked || current == AtomicBarrier.State.Passed
}

/** Blocks at this point until the phase has been entered. */
def waitToEnter(): Unit = entryBarrier.waitToPass()

/** Unblock the next dependent phase. */
def leave(): Unit = exitBarrier.unblock()

/**
* Wait to enter this phase, then execute `f`, and leave before returning the result of `f`.
*
* @return the result of evaluating `f`
*/
def execute[T](f: => T): T = {
waitToEnter()
try {
f
} finally {
leave()
}
}

/**
* If there is nothing that needs to be done in this phase,
* we can leave immediately after entering.
*/
def passThrough(): Unit = {
waitToEnter()
leave()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.fuzzer

case class OptimisticTransactionPhases(
initialPhase: ExecutionPhaseLock,
preparePhase: ExecutionPhaseLock,
commitPhase: ExecutionPhaseLock)

object OptimisticTransactionPhases {

private final val PREFIX = "TXN_"

final val INITIAL_PHASE_LABEL = PREFIX + "INIT"
final val PREPARE_PHASE_LABEL = PREFIX + "PREPARE"
final val COMMIT_PHASE_LABEL = PREFIX + "COMMIT"

def forName(txnName: String): OptimisticTransactionPhases = {

def toTxnPhaseLabel(phaseLabel: String): String =
txnName + "-" + phaseLabel

OptimisticTransactionPhases(
initialPhase = ExecutionPhaseLock(toTxnPhaseLabel(INITIAL_PHASE_LABEL)),
preparePhase = ExecutionPhaseLock(toTxnPhaseLabel(PREPARE_PHASE_LABEL)),
commitPhase = ExecutionPhaseLock(toTxnPhaseLabel(COMMIT_PHASE_LABEL)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.fuzzer

import org.apache.spark.sql.delta.{OptimisticTransaction, TransactionExecutionObserver}

private[delta] class PhaseLockingTransactionExecutionObserver(
val phases: OptimisticTransactionPhases)
extends TransactionExecutionObserver {

override def startingTransaction(f: => OptimisticTransaction): OptimisticTransaction =
phases.initialPhase.execute(f)

override def preparingCommit[T](f: => T): T = phases.preparePhase.execute(f)

override def beginDoCommit(): Unit = phases.commitPhase.waitToEnter()

override def transactionCommitted(): Unit = phases.commitPhase.leave()

override def transactionAborted(): Unit = {
if (!phases.commitPhase.hasEntered) {
// If an Exception was thrown earlier we may not have called `beginDoCommit`, yet.
phases.commitPhase.passThrough()
} else {
phases.commitPhase.leave()
}
}
}
Loading

0 comments on commit 01c0ef9

Please sign in to comment.