diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index c6f80d76ffd..5456018c5c2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -500,14 +500,6 @@ class DeltaLog private( createDirIfNotExists(FileNames.commitDirPath(logPath)) } - /** - * Create the log directory. Unlike `ensureLogDirectoryExist`, this method doesn't check whether - * the log directory exists and it will ignore the return value of `mkdirs`. - */ - def createLogDirectory(): Unit = { - logPath.getFileSystem(newDeltaHadoopConf()).mkdirs(logPath) - } - /* ------------ * | Integration | * ------------ */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 1f581cc96f7..48b47612d5b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1412,7 +1412,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite .assignIfMissing(protocol, allActions, getFirstAttemptVersion) if (readVersion < 0) { - deltaLog.createLogDirectory() + deltaLog.ensureLogDirectoryExist() } val fsWriteStartNano = System.nanoTime() val jsonActions = allActions.map(_.json) @@ -1694,6 +1694,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite } logWarning("Detected no metadata in initial commit but commit validation was turned off.") } + } else { + // Ensure Commit Directory exists if managed commits is enabled on an existing table. + newMetadata.flatMap(_.managedCommitOwnerName).foreach(_ => deltaLog.ensureLogDirectoryExist()) } val partitionColumns = metadata.physicalPartitionSchema.fieldNames.toSet diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala index af4d66993cc..2f123a398ee 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala @@ -189,7 +189,7 @@ abstract class CloneTableBase( } if (txn.readVersion < 0) { - destinationTable.createLogDirectory() + destinationTable.ensureLogDirectoryExist() } val metadataToUpdate = determineTargetMetadata(txn.snapshot, deltaOperation.name) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala index 9c2505aeed9..517c1f5f373 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala @@ -223,7 +223,7 @@ case class WriteIntoDelta( if (txn.readVersion < 0) { // Initialize the log path - deltaLog.createLogDirectory() + deltaLog.ensureLogDirectoryExist() } val (newFiles, addFiles, deletedFiles) = (mode, replaceWhere) match { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 93ede5a355c..9a5719e33bf 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -687,11 +687,14 @@ class DeltaLogSuite extends QueryTest val fs = logPath.getFileSystem(log.newDeltaHadoopConf()) assert(fs.exists(logPath), "Log path should exist.") assert(fs.getFileStatus(logPath).isDirectory, "Log path should be a directory") + val commitPath = FileNames.commitDirPath(logPath) + assert(fs.exists(commitPath), "Commit path should exist.") + assert(fs.getFileStatus(logPath).isDirectory, "Commit path should be a directory") } } test("DeltaLog should throw exception when unable to create log directory " + - "with filesystem IO Exception") { + "with filesystem IO Exception") { withTempDir { dir => val path = dir.getCanonicalPath val log = DeltaLog.forTable(spark, new Path(path)) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index a621b7e854b..86f82fcd5d2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -297,6 +297,29 @@ class OptimisticTransactionSuite } } + test("enabling Managed Commits on an existing table should create commit dir") { + withTempDir { tempDir => + val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath)) + val metadata = Metadata() + log.startTransaction().commit(Seq(metadata), ManualUpdate) + val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf()) + val commitDir = FileNames.commitDirPath(log.logPath) + // Delete commit directory. + fs.delete(commitDir) + assert(!fs.exists(commitDir)) + // With no Managed Commits conf, commit directory should not be created. + log.startTransaction().commit(Seq(metadata), ManualUpdate) + assert(!fs.exists(commitDir)) + // Enabling Managed Commits on an existing table should create the commit dir. + CommitOwnerProvider.registerBuilder(InMemoryCommitOwnerBuilder(3)) + val newMetadata = metadata.copy(configuration = + (metadata.configuration ++ + Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> "in-memory")).toMap) + log.startTransaction().commit(Seq(newMetadata), ManualUpdate) + assert(fs.exists(commitDir)) + } + } + test("AddFile with different partition schema compared to metadata should fail") { withTempDir { tempDir => val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath))