Skip to content

Commit

Permalink
Large commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 28, 2024
1 parent f37dc86 commit be0ffcc
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1364,11 +1364,22 @@ trait OptimisticTransactionImpl extends TransactionalWrite
recordDeltaEvent(deltaLog, DeltaLogging.DELTA_COMMIT_STATS_OPTYPE, data = stats)
(attemptVersion, postCommitSnapshot)
} catch {
case e: java.nio.file.FileAlreadyExistsException =>
case ex if ex.isInstanceOf[java.nio.file.FileAlreadyExistsException] |
(ex.isInstanceOf[CommitFailedException] &&
ex.asInstanceOf[CommitFailedException].conflict) =>
val managedCommitExceptionOpt = ex match {
case e : CommitFailedException => Some(e)
case _ => None
}
recordDeltaEvent(
deltaLog,
"delta.commitLarge.failure",
data = Map("exception" -> Utils.exceptionString(e), "operation" -> op.name))
data = Map("exception" -> Utils.exceptionString(ex),
"operation" -> op.name,
"fromManagedCommit" -> managedCommitExceptionOpt.map(true).getOrElse(false),
"fromManagedCommitConflict" -> managedCommitExceptionOpt.map(_.conflict).getOrElse(""),
"fromManagedCommitRetryable" ->
managedCommitExceptionOpt.map(_.retryable).getOrElse("")))
// Actions of a commit which went in before ours.
// Requires updating deltaLog to retrieve these actions, as another writer may have used
// CommitStore for writing.
Expand All @@ -1384,12 +1395,22 @@ trait OptimisticTransactionImpl extends TransactionalWrite
logs.close()
}

case NonFatal(e) =>
case NonFatal(ex) =>
val managedCommitExceptionOpt = ex match {
case e: CommitFailedException => Some(e)
case _ => None
}
recordDeltaEvent(
deltaLog,
"delta.commitLarge.failure",
data = Map("exception" -> Utils.exceptionString(e), "operation" -> op.name))
throw e
deltaLog,
"delta.commitLarge.failure",
data = Map(
"exception" -> Utils.exceptionString(ex),
"operation" -> op.name,
"fromManagedCommit" -> managedCommitExceptionOpt.map(true).getOrElse(false),
"fromManagedCommitConflict" -> managedCommitExceptionOpt.map(_.conflict).getOrElse(""),
"fromManagedCommitRetryable" ->
managedCommitExceptionOpt.map(_.retryable).getOrElse("")))
throw ex
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.sql.delta
// scalastyle:off import.ordering.noEmptyLine
import java.nio.file.FileAlreadyExistsException

import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, Metadata, Protocol, RemoveFile, SetTransaction}
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, UpdatedActions}
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, InMemoryCommitStore, UpdatedActions}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
Expand Down Expand Up @@ -266,6 +267,11 @@ class OptimisticTransactionSuite
actions = Seq(
AddFile("b", Map.empty, 1, 1, dataChange = true)))

override def beforeEach(): Unit = {
super.beforeEach()
CommitStoreProvider.clearNonDefaultBuilders()
}

test("initial commit without metadata should fail") {
withTempDir { tempDir =>
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
Expand Down Expand Up @@ -806,4 +812,59 @@ class OptimisticTransactionSuite
}
}
}

BOOLEAN_DOMAIN.foreach { conflict =>
test(s"commitLarge should handle Commit Failed Exception with conflict: $conflict") {
withTempDir { tempDir =>
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
val commitStoreName = "retryable-conflict-commit-store"
class RetryableConflictCommitStore extends InMemoryCommitStore(batchSize = 5) {
override def commit(
logStore: LogStore,
hadoopConf: Configuration,
tablePath: Path,
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
if (updatedActions.commitInfo.operation == DeltaOperations.OP_RESTORE) {
deltaLog.startTransaction().commit(addB :: Nil, ManualUpdate)
throw new CommitFailedException(retryable = true, conflict, message = "")
}
super.commit(logStore, hadoopConf, tablePath, commitVersion, actions, updatedActions)
}
}
object RetryableConflictCommitStoreBuilder extends CommitStoreBuilder {
lazy val commitStore = new RetryableConflictCommitStore()
override def name: String = commitStoreName
override def build(conf: Map[String, String]): CommitStore = commitStore
}
CommitStoreProvider.registerBuilder(RetryableConflictCommitStoreBuilder)
val conf = Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> commitStoreName)
deltaLog.startTransaction().commit(Seq(Metadata(configuration = conf)), ManualUpdate)
RetryableConflictCommitStoreBuilder.commitStore.registerTable(
logPath = deltaLog.logPath, maxCommitVersion = 0)
deltaLog.startTransaction().commit(addA :: Nil, ManualUpdate)
// commitLarge must fail because of a conflicting commit at version-2.
val e = intercept[Exception] {
deltaLog.startTransaction().commitLarge(
spark,
nonProtocolMetadataActions = (addB :: Nil).iterator,
newProtocolOpt = None,
op = DeltaOperations.Restore(Some(0), None),
context = Map.empty,
metrics = Map.empty)
}
if (conflict) {
assert(e.isInstanceOf[ConcurrentWriteException])
assert(
e.getMessage.contains(
"A concurrent transaction has written new data since the current transaction " +
s"read the table. Please try the operation again"))
} else {
assert(e.isInstanceOf[CommitFailedException])
}
assert(deltaLog.update().version == 2)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.util.Utils
trait OptimisticTransactionSuiteBase
extends QueryTest
with SharedSparkSession
with DeltaTestUtilsBase
with DeletionVectorsTestUtils {


Expand Down

0 comments on commit be0ffcc

Please sign in to comment.