Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Apr 1, 2024
1 parent 9576d08 commit 8224957
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,21 @@ trait OptimisticTransactionImpl extends TransactionalWrite
assert(!committed, "Transaction already committed.")
commitStartNano = System.nanoTime()
val attemptVersion = getFirstAttemptVersion

def recordCommitLargeFailure(ex: Throwable, op: DeltaOperations.Operation): Unit = {
val managedCommitExceptionOpt = ex match {
case e: CommitFailedException => Some(e)
case _ => None
}
val data = Map(
"exception" -> Utils.exceptionString(ex),
"operation" -> op.name,
"fromManagedCommit" -> managedCommitExceptionOpt.isDefined,
"fromManagedCommitConflict" -> managedCommitExceptionOpt.map(_.conflict).getOrElse(""),
"fromManagedCommitRetryable" -> managedCommitExceptionOpt.map(_.retryable).getOrElse(""))
recordDeltaEvent(deltaLog, "delta.commitLarge.failure", data = data)
}

try {
val tags = Map.empty[String, String]
val commitInfo = CommitInfo(
Expand Down Expand Up @@ -1365,32 +1380,28 @@ trait OptimisticTransactionImpl extends TransactionalWrite
recordDeltaEvent(deltaLog, DeltaLogging.DELTA_COMMIT_STATS_OPTYPE, data = stats)
(attemptVersion, postCommitSnapshot)
} catch {
case e: java.nio.file.FileAlreadyExistsException =>
recordDeltaEvent(
deltaLog,
"delta.commitLarge.failure",
data = Map("exception" -> Utils.exceptionString(e), "operation" -> op.name))
// Actions of a commit which went in before ours.
// Requires updating deltaLog to retrieve these actions, as another writer may have used
// CommitStore for writing.
val logs = deltaLog.store.readAsIterator(
DeltaCommitFileProvider(deltaLog.update()).deltaFile(attemptVersion),
deltaLog.newDeltaHadoopConf())
try {
val winningCommitActions = logs.map(Action.fromJson)
val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a }
.map(ci => ci.copy(version = Some(attemptVersion)))
throw DeltaErrors.concurrentWriteException(commitInfo)
} finally {
logs.close()
case e: Throwable =>
e match {
case _: FileAlreadyExistsException | CommitFailedException(_, true, _) =>
recordCommitLargeFailure(e, op)
// Actions of a commit which went in before ours.
// Requires updating deltaLog to retrieve these actions, as another writer may have used
// CommitStore for writing.
val logs = deltaLog.store.readAsIterator(
DeltaCommitFileProvider(deltaLog.update()).deltaFile(attemptVersion),
deltaLog.newDeltaHadoopConf())
try {
val winningCommitActions = logs.map(Action.fromJson)
val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a }
.map(ci => ci.copy(version = Some(attemptVersion)))
throw DeltaErrors.concurrentWriteException(commitInfo)
} finally {
logs.close()
}
case NonFatal(_) =>
recordCommitLargeFailure(e, op)
throw e
}

case NonFatal(e) =>
recordDeltaEvent(
deltaLog,
"delta.commitLarge.failure",
data = Map("exception" -> Utils.exceptionString(e), "operation" -> op.name))
throw e
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ case class Commit(
* | yes | no | transient error (e.g. network hiccup) |
* | yes | yes | physical conflict (allowed to rebase and retry) |
*/
class CommitFailedException(
val retryable: Boolean, val conflict: Boolean, message: String) extends Exception(message)
case class CommitFailedException(
retryable: Boolean, conflict: Boolean, message: String) extends Exception(message)

/** Response container for [[CommitStore.commit]] API */
case class CommitResponse(commit: Commit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
val tableData = perTableMap.get(logPath)
val expectedVersion = tableData.maxCommitVersion + 1
if (commitVersion != expectedVersion) {
throw new CommitFailedException(
throw CommitFailedException(
retryable = commitVersion < expectedVersion,
conflict = commitVersion < expectedVersion,
s"Commit version $commitVersion is not valid. Expected version: $expectedVersion.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ package org.apache.spark.sql.delta
// scalastyle:off import.ordering.noEmptyLine
import java.nio.file.FileAlreadyExistsException

import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, Metadata, Protocol, RemoveFile, SetTransaction}
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, UpdatedActions}
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, InMemoryCommitStore, UpdatedActions}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.util.ManualClock

Expand Down Expand Up @@ -266,6 +268,11 @@ class OptimisticTransactionSuite
actions = Seq(
AddFile("b", Map.empty, 1, 1, dataChange = true)))

override def beforeEach(): Unit = {
super.beforeEach()
CommitStoreProvider.clearNonDefaultBuilders()
}

test("initial commit without metadata should fail") {
withTempDir { tempDir =>
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
Expand Down Expand Up @@ -468,7 +475,7 @@ class OptimisticTransactionSuite
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
commitAttempts += 1
throw new CommitFailedException(
throw CommitFailedException(
retryable = true,
conflict = commitAttempts > initialNonConflictErrors &&
commitAttempts <= (initialNonConflictErrors + initialConflictErrors),
Expand Down Expand Up @@ -806,4 +813,67 @@ class OptimisticTransactionSuite
}
}
}

BOOLEAN_DOMAIN.foreach { conflict =>
test(s"commitLarge should handle Commit Failed Exception with conflict: $conflict") {
withTempDir { tempDir =>
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
val commitStoreName = "retryable-conflict-commit-store"
class RetryableConflictCommitStore extends InMemoryCommitStore(batchSize = 5) {
override def commit(
logStore: LogStore,
hadoopConf: Configuration,
tablePath: Path,
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
if (updatedActions.commitInfo.operation == DeltaOperations.OP_RESTORE) {
deltaLog.startTransaction().commit(addB :: Nil, ManualUpdate)
throw CommitFailedException(retryable = true, conflict, message = "")
}
super.commit(logStore, hadoopConf, tablePath, commitVersion, actions, updatedActions)
}
}
object RetryableConflictCommitStoreBuilder extends CommitStoreBuilder {
lazy val commitStore = new RetryableConflictCommitStore()
override def name: String = commitStoreName
override def build(conf: Map[String, String]): CommitStore = commitStore
}
CommitStoreProvider.registerBuilder(RetryableConflictCommitStoreBuilder)
val conf = Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> commitStoreName)
deltaLog.startTransaction().commit(Seq(Metadata(configuration = conf)), ManualUpdate)
RetryableConflictCommitStoreBuilder.commitStore.registerTable(
logPath = deltaLog.logPath, maxCommitVersion = 0)
deltaLog.startTransaction().commit(addA :: Nil, ManualUpdate)
val records = Log4jUsageLogger.track {
// commitLarge must fail because of a conflicting commit at version-2.
val e = intercept[Exception] {
deltaLog.startTransaction().commitLarge(
spark,
nonProtocolMetadataActions = (addB :: Nil).iterator,
newProtocolOpt = None,
op = DeltaOperations.Restore(Some(0), None),
context = Map.empty,
metrics = Map.empty)
}
if (conflict) {
assert(e.isInstanceOf[ConcurrentWriteException])
assert(
e.getMessage.contains(
"A concurrent transaction has written new data since the current transaction " +
s"read the table. Please try the operation again"))
} else {
assert(e.isInstanceOf[CommitFailedException])
}
assert(deltaLog.update().version == 2)
}
val failureRecord = filterUsageRecords(records, "delta.commitLarge.failure")
assert(failureRecord.size == 1)
val data = JsonUtils.fromJson[Map[String, Any]](failureRecord.head.blob)
assert(data("fromManagedCommit") == true)
assert(data("fromManagedCommitConflict") == conflict)
assert(data("fromManagedCommitRetryable") == true)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.util.Utils
trait OptimisticTransactionSuiteBase
extends QueryTest
with SharedSparkSession
with DeltaTestUtilsBase
with DeletionVectorsTestUtils {


Expand Down

0 comments on commit 8224957

Please sign in to comment.