diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index bad9bd7edbb..491900cb3d0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1364,11 +1364,22 @@ trait OptimisticTransactionImpl extends TransactionalWrite recordDeltaEvent(deltaLog, DeltaLogging.DELTA_COMMIT_STATS_OPTYPE, data = stats) (attemptVersion, postCommitSnapshot) } catch { - case e: java.nio.file.FileAlreadyExistsException => + case ex if ex.isInstanceOf[java.nio.file.FileAlreadyExistsException] | + (ex.isInstanceOf[CommitFailedException] && + ex.asInstanceOf[CommitFailedException].conflict) => + val managedCommitExceptionOpt = ex match { + case e : CommitFailedException => Some(e) + case _ => None + } recordDeltaEvent( deltaLog, "delta.commitLarge.failure", - data = Map("exception" -> Utils.exceptionString(e), "operation" -> op.name)) + data = Map("exception" -> Utils.exceptionString(ex), + "operation" -> op.name, + "fromManagedCommit" -> managedCommitExceptionOpt.map(true).getOrElse(false), + "fromManagedCommitConflict" -> managedCommitExceptionOpt.map(_.conflict).getOrElse(""), + "fromManagedCommitRetryable" -> + managedCommitExceptionOpt.map(_.retryable).getOrElse(""))) // Actions of a commit which went in before ours. // Requires updating deltaLog to retrieve these actions, as another writer may have used // CommitStore for writing. @@ -1384,12 +1395,22 @@ trait OptimisticTransactionImpl extends TransactionalWrite logs.close() } - case NonFatal(e) => + case NonFatal(ex) => + val managedCommitExceptionOpt = ex match { + case e: CommitFailedException => Some(e) + case _ => None + } recordDeltaEvent( - deltaLog, - "delta.commitLarge.failure", - data = Map("exception" -> Utils.exceptionString(e), "operation" -> op.name)) - throw e + deltaLog, + "delta.commitLarge.failure", + data = Map( + "exception" -> Utils.exceptionString(ex), + "operation" -> op.name, + "fromManagedCommit" -> managedCommitExceptionOpt.map(true).getOrElse(false), + "fromManagedCommitConflict" -> managedCommitExceptionOpt.map(_.conflict).getOrElse(""), + "fromManagedCommitRetryable" -> + managedCommitExceptionOpt.map(_.retryable).getOrElse(""))) + throw ex } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index 4ad0c521b29..61ea933a1a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import java.nio.file.FileAlreadyExistsException +import com.databricks.spark.util.Log4jUsageLogger import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, Metadata, Protocol, RemoveFile, SetTransaction} -import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, UpdatedActions} +import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, InMemoryCommitStore, UpdatedActions} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.test.DeltaTestImplicits._ @@ -266,6 +267,11 @@ class OptimisticTransactionSuite actions = Seq( AddFile("b", Map.empty, 1, 1, dataChange = true))) + override def beforeEach(): Unit = { + super.beforeEach() + CommitStoreProvider.clearNonDefaultBuilders() + } + test("initial commit without metadata should fail") { withTempDir { tempDir => val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) @@ -806,4 +812,59 @@ class OptimisticTransactionSuite } } } + + BOOLEAN_DOMAIN.foreach { conflict => + test(s"commitLarge should handle Commit Failed Exception with conflict: $conflict") { + withTempDir { tempDir => + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + val commitStoreName = "retryable-conflict-commit-store" + class RetryableConflictCommitStore extends InMemoryCommitStore(batchSize = 5) { + override def commit( + logStore: LogStore, + hadoopConf: Configuration, + tablePath: Path, + commitVersion: Long, + actions: Iterator[String], + updatedActions: UpdatedActions): CommitResponse = { + if (updatedActions.commitInfo.operation == DeltaOperations.OP_RESTORE) { + deltaLog.startTransaction().commit(addB :: Nil, ManualUpdate) + throw new CommitFailedException(retryable = true, conflict, message = "") + } + super.commit(logStore, hadoopConf, tablePath, commitVersion, actions, updatedActions) + } + } + object RetryableConflictCommitStoreBuilder extends CommitStoreBuilder { + lazy val commitStore = new RetryableConflictCommitStore() + override def name: String = commitStoreName + override def build(conf: Map[String, String]): CommitStore = commitStore + } + CommitStoreProvider.registerBuilder(RetryableConflictCommitStoreBuilder) + val conf = Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> commitStoreName) + deltaLog.startTransaction().commit(Seq(Metadata(configuration = conf)), ManualUpdate) + RetryableConflictCommitStoreBuilder.commitStore.registerTable( + logPath = deltaLog.logPath, maxCommitVersion = 0) + deltaLog.startTransaction().commit(addA :: Nil, ManualUpdate) + // commitLarge must fail because of a conflicting commit at version-2. + val e = intercept[Exception] { + deltaLog.startTransaction().commitLarge( + spark, + nonProtocolMetadataActions = (addB :: Nil).iterator, + newProtocolOpt = None, + op = DeltaOperations.Restore(Some(0), None), + context = Map.empty, + metrics = Map.empty) + } + if (conflict) { + assert(e.isInstanceOf[ConcurrentWriteException]) + assert( + e.getMessage.contains( + "A concurrent transaction has written new data since the current transaction " + + s"read the table. Please try the operation again")) + } else { + assert(e.isInstanceOf[CommitFailedException]) + } + assert(deltaLog.update().version == 2) + } + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuiteBase.scala index 05e92508e46..f772351ffe3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuiteBase.scala @@ -31,6 +31,7 @@ import org.apache.spark.util.Utils trait OptimisticTransactionSuiteBase extends QueryTest with SharedSparkSession + with DeltaTestUtilsBase with DeletionVectorsTestUtils {