From 76b1b991e729cf24cc8c899614119561a7946346 Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Fri, 29 Mar 2024 12:34:10 -0700 Subject: [PATCH] Fix bug --- .../sql/delta/OptimisticTransaction.scala | 61 ++++++++++-------- .../sql/delta/managedcommit/CommitStore.scala | 4 +- .../managedcommit/InMemoryCommitStore.scala | 2 +- .../delta/OptimisticTransactionSuite.scala | 64 ++++++++++++++++++- .../OptimisticTransactionSuiteBase.scala | 1 + 5 files changed, 102 insertions(+), 30 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index b9bae649dcc..0126f817e77 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1219,6 +1219,21 @@ trait OptimisticTransactionImpl extends TransactionalWrite assert(!committed, "Transaction already committed.") commitStartNano = System.nanoTime() val attemptVersion = getFirstAttemptVersion + + def recordCommitLargeFailure(ex: Throwable, op: DeltaOperations.Operation): Unit = { + val managedCommitExceptionOpt = ex match { + case e: CommitFailedException => Some(e) + case _ => None + } + val data = Map( + "exception" -> Utils.exceptionString(ex), + "operation" -> op.name, + "fromManagedCommit" -> managedCommitExceptionOpt.isDefined, + "fromManagedCommitConflict" -> managedCommitExceptionOpt.map(_.conflict).getOrElse(""), + "fromManagedCommitRetryable" -> managedCommitExceptionOpt.map(_.retryable).getOrElse("")) + recordDeltaEvent(deltaLog, "delta.commitLarge.failure", data = data) + } + try { val tags = Map.empty[String, String] val commitInfo = CommitInfo( @@ -1365,32 +1380,28 @@ trait OptimisticTransactionImpl extends TransactionalWrite recordDeltaEvent(deltaLog, DeltaLogging.DELTA_COMMIT_STATS_OPTYPE, data = stats) (attemptVersion, postCommitSnapshot) } catch { - case e: java.nio.file.FileAlreadyExistsException => - recordDeltaEvent( - deltaLog, - "delta.commitLarge.failure", - data = Map("exception" -> Utils.exceptionString(e), "operation" -> op.name)) - // Actions of a commit which went in before ours. - // Requires updating deltaLog to retrieve these actions, as another writer may have used - // CommitStore for writing. - val logs = deltaLog.store.readAsIterator( - DeltaCommitFileProvider(deltaLog.update()).deltaFile(attemptVersion), - deltaLog.newDeltaHadoopConf()) - try { - val winningCommitActions = logs.map(Action.fromJson) - val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a } - .map(ci => ci.copy(version = Some(attemptVersion))) - throw DeltaErrors.concurrentWriteException(commitInfo) - } finally { - logs.close() + case e: Throwable => + e match { + case _: FileAlreadyExistsException | CommitFailedException(_, true, _) => + recordCommitLargeFailure(e, op) + // Actions of a commit which went in before ours. + // Requires updating deltaLog to retrieve these actions, as another writer may have used + // CommitStore for writing. + val logs = deltaLog.store.readAsIterator( + DeltaCommitFileProvider(deltaLog.update()).deltaFile(attemptVersion), + deltaLog.newDeltaHadoopConf()) + try { + val winningCommitActions = logs.map(Action.fromJson) + val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a } + .map(ci => ci.copy(version = Some(attemptVersion))) + throw DeltaErrors.concurrentWriteException(commitInfo) + } finally { + logs.close() + } + case NonFatal(_) => + recordCommitLargeFailure(e, op) + throw e } - - case NonFatal(e) => - recordDeltaEvent( - deltaLog, - "delta.commitLarge.failure", - data = Map("exception" -> Utils.exceptionString(e), "operation" -> op.name)) - throw e } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala index 93d30428048..287c48f6a26 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala @@ -40,8 +40,8 @@ case class Commit( * | yes | no | transient error (e.g. network hiccup) | * | yes | yes | physical conflict (allowed to rebase and retry) | */ -class CommitFailedException( - val retryable: Boolean, val conflict: Boolean, message: String) extends Exception(message) +case class CommitFailedException( + retryable: Boolean, conflict: Boolean, message: String) extends Exception(message) /** Response container for [[CommitStore.commit]] API */ case class CommitResponse(commit: Commit) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala index 1e873d11602..8055a5c37f6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala @@ -81,7 +81,7 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC val tableData = perTableMap.get(logPath) val expectedVersion = tableData.maxCommitVersion + 1 if (commitVersion != expectedVersion) { - throw new CommitFailedException( + throw CommitFailedException( retryable = commitVersion < expectedVersion, conflict = commitVersion < expectedVersion, s"Commit version $commitVersion is not valid. Expected version: $expectedVersion.") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index 4ad0c521b29..523490100c0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -22,7 +22,7 @@ import java.nio.file.FileAlreadyExistsException import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, Metadata, Protocol, RemoveFile, SetTransaction} -import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, UpdatedActions} +import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, InMemoryCommitStore, UpdatedActions} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.test.DeltaTestImplicits._ @@ -266,6 +266,11 @@ class OptimisticTransactionSuite actions = Seq( AddFile("b", Map.empty, 1, 1, dataChange = true))) + override def beforeEach(): Unit = { + super.beforeEach() + CommitStoreProvider.clearNonDefaultBuilders() + } + test("initial commit without metadata should fail") { withTempDir { tempDir => val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) @@ -468,7 +473,7 @@ class OptimisticTransactionSuite actions: Iterator[String], updatedActions: UpdatedActions): CommitResponse = { commitAttempts += 1 - throw new CommitFailedException( + throw CommitFailedException( retryable = true, conflict = commitAttempts > initialNonConflictErrors && commitAttempts <= (initialNonConflictErrors + initialConflictErrors), @@ -806,4 +811,59 @@ class OptimisticTransactionSuite } } } + + BOOLEAN_DOMAIN.foreach { conflict => + test(s"commitLarge should handle Commit Failed Exception with conflict: $conflict") { + withTempDir { tempDir => + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + val commitStoreName = "retryable-conflict-commit-store" + class RetryableConflictCommitStore extends InMemoryCommitStore(batchSize = 5) { + override def commit( + logStore: LogStore, + hadoopConf: Configuration, + tablePath: Path, + commitVersion: Long, + actions: Iterator[String], + updatedActions: UpdatedActions): CommitResponse = { + if (updatedActions.commitInfo.operation == DeltaOperations.OP_RESTORE) { + deltaLog.startTransaction().commit(addB :: Nil, ManualUpdate) + throw CommitFailedException(retryable = true, conflict, message = "") + } + super.commit(logStore, hadoopConf, tablePath, commitVersion, actions, updatedActions) + } + } + object RetryableConflictCommitStoreBuilder extends CommitStoreBuilder { + lazy val commitStore = new RetryableConflictCommitStore() + override def name: String = commitStoreName + override def build(conf: Map[String, String]): CommitStore = commitStore + } + CommitStoreProvider.registerBuilder(RetryableConflictCommitStoreBuilder) + val conf = Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> commitStoreName) + deltaLog.startTransaction().commit(Seq(Metadata(configuration = conf)), ManualUpdate) + RetryableConflictCommitStoreBuilder.commitStore.registerTable( + logPath = deltaLog.logPath, maxCommitVersion = 0) + deltaLog.startTransaction().commit(addA :: Nil, ManualUpdate) + // commitLarge must fail because of a conflicting commit at version-2. + val e = intercept[Exception] { + deltaLog.startTransaction().commitLarge( + spark, + nonProtocolMetadataActions = (addB :: Nil).iterator, + newProtocolOpt = None, + op = DeltaOperations.Restore(Some(0), None), + context = Map.empty, + metrics = Map.empty) + } + if (conflict) { + assert(e.isInstanceOf[ConcurrentWriteException]) + assert( + e.getMessage.contains( + "A concurrent transaction has written new data since the current transaction " + + s"read the table. Please try the operation again")) + } else { + assert(e.isInstanceOf[CommitFailedException]) + } + assert(deltaLog.update().version == 2) + } + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuiteBase.scala index 05e92508e46..f772351ffe3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuiteBase.scala @@ -31,6 +31,7 @@ import org.apache.spark.util.Utils trait OptimisticTransactionSuiteBase extends QueryTest with SharedSparkSession + with DeltaTestUtilsBase with DeletionVectorsTestUtils {