Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
sabir-akhadov committed Feb 26, 2024
1 parent 7d41fb7 commit 09bab44
Showing 1 changed file with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@

package org.apache.spark.sql.delta.commands

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.DeltaCommitTag
import org.apache.spark.sql.delta.actions.{Action, FileAction}
import org.apache.spark.sql.delta.sources.DeltaSQLConf.DELTA_LAST_COMMIT_VERSION_IN_SESSION

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.setActiveSession

object DMLUtils {

Expand All @@ -38,3 +44,45 @@ object DMLUtils {
def empty[A <: Action]: TaggedCommitData[A] = TaggedCommitData(Seq.empty[A])
}
}

/**
* Helper trait to run a block of code with a cloned SparkSession and overwrite SQL configs.
*/
trait RunWithClonedSparkSession {

protected val SQLConfigsToOverwrite: Map[ConfigEntry[Boolean], Boolean]

def runWithClonedSparkSession[T](spark: SparkSession)(f: SparkSession => T): T = {
val clonedSession = spark.cloneSession()
SQLConfigsToOverwrite.foreach { case (key, overwriteValue) =>
clonedSession.sessionState.conf.setConf(key, overwriteValue)
}
setActiveSession(clonedSession)
try {
f(clonedSession)
} finally {
copyBackLastCommitVersionFromClonedSession(
originalSession = spark,
clonedSession = clonedSession
)
setActiveSession(spark)
}
}

/**
* Copy last commit version from cloned session in case it was updated. This makes sure the
* caller will see the updated value using the original session. Any concurrent modification in
* the original session would get overwritten. But a similar issue would happen anyways if the
* original session was used directly instead of the clone.
*/
private def copyBackLastCommitVersionFromClonedSession(
originalSession: SparkSession,
clonedSession: SparkSession): Unit = {
val copyConfigs = Set(DELTA_LAST_COMMIT_VERSION_IN_SESSION.key)
copyConfigs.foreach { config =>
clonedSession.conf
.getOption(config)
.foreach(value => originalSession.conf.set(config, value))
}
}
}

0 comments on commit 09bab44

Please sign in to comment.