From fecfce0c8550e77c88d5302ee6b8d96e3c7b860b Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Mon, 6 May 2024 20:08:03 -0700 Subject: [PATCH] Parallel calls --- .../org/apache/spark/sql/delta/DeltaLog.scala | 12 ++----- .../sql/delta/OptimisticTransaction.scala | 11 +++++-- .../sql/delta/commands/CloneTableBase.scala | 2 +- .../commands/ConvertToDeltaCommand.scala | 2 +- .../sql/delta/commands/WriteIntoDelta.scala | 2 +- .../io/delta/tables/DeltaTableSuite.scala | 4 +-- .../spark/sql/delta/CloneTableSuiteBase.scala | 2 +- .../spark/sql/delta/DeltaLogSuite.scala | 9 ++++-- .../sql/delta/DeltaProtocolVersionSuite.scala | 6 ++-- .../sql/delta/DeltaTableFeatureSuite.scala | 2 +- .../sql/delta/DescribeDeltaHistorySuite.scala | 2 +- .../delta/OptimisticTransactionSuite.scala | 31 +++++++++++++++++++ 12 files changed, 59 insertions(+), 26 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index ebdd02166c7..e84933251ae 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -467,8 +467,8 @@ class DeltaLog private( def isSameLogAs(otherLog: DeltaLog): Boolean = this.compositeId == otherLog.compositeId - /** Creates the log directory if it does not exist. */ - def ensureLogDirectoryExist(): Unit = { + /** Creates the log directory and commit directory if it does not exist. */ + def createLogDirectoriesIfNotExists(): Unit = { val fs = logPath.getFileSystem(newDeltaHadoopConf()) def createDirIfNotExists(path: Path): Unit = { // Optimistically attempt to create the directory first without checking its existence. @@ -500,14 +500,6 @@ class DeltaLog private( createDirIfNotExists(FileNames.commitDirPath(logPath)) } - /** - * Create the log directory. Unlike `ensureLogDirectoryExist`, this method doesn't check whether - * the log directory exists and it will ignore the return value of `mkdirs`. - */ - def createLogDirectory(): Unit = { - logPath.getFileSystem(newDeltaHadoopConf()).mkdirs(logPath) - } - /* ------------ * | Integration | * ------------ */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 7e2bcb9d16b..27b2e8a314c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1415,7 +1415,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite .assignIfMissing(protocol, allActions, getFirstAttemptVersion) if (readVersion < 0) { - deltaLog.createLogDirectory() + deltaLog.createLogDirectoriesIfNotExists() } val fsWriteStartNano = System.nanoTime() val jsonActions = allActions.map(_.json) @@ -1681,8 +1681,15 @@ trait OptimisticTransactionImpl extends TransactionalWrite // `assertMetadata` call above. performCdcColumnMappingCheck(finalActions, op) + // Ensure Commit Directory exists when managed commits is enabled on an existing table. + lazy val isFsToMcConversion = snapshot.metadata.managedCommitOwnerName.isEmpty && + newMetadata.flatMap(_.managedCommitOwnerName).nonEmpty + val shouldCreateLogDirs = snapshot.version == -1 || isFsToMcConversion + if (shouldCreateLogDirs) { + deltaLog.createLogDirectoriesIfNotExists() + } + if (snapshot.version == -1) { - deltaLog.ensureLogDirectoryExist() // If this is the first commit and no protocol is specified, initialize the protocol version. if (!finalActions.exists(_.isInstanceOf[Protocol])) { finalActions = protocol +: finalActions diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala index af4d66993cc..92cecfa8964 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala @@ -189,7 +189,7 @@ abstract class CloneTableBase( } if (txn.readVersion < 0) { - destinationTable.createLogDirectory() + destinationTable.createLogDirectoriesIfNotExists() } val metadataToUpdate = determineTargetMetadata(txn.snapshot, deltaOperation.name) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index 097d9ae9e99..ab2b08b8770 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -346,7 +346,7 @@ abstract class ConvertToDeltaCommandBase( convertProperties: ConvertTarget, targetTable: ConvertTargetTable): Seq[Row] = recordDeltaOperation(txn.deltaLog, "delta.convert") { - txn.deltaLog.ensureLogDirectoryExist() + txn.deltaLog.createLogDirectoriesIfNotExists() val targetPath = new Path(convertProperties.targetDir) // scalastyle:off deltahadoopconfiguration val sessionHadoopConf = spark.sessionState.newHadoopConf() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala index 9c2505aeed9..c0cfc586a64 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala @@ -223,7 +223,7 @@ case class WriteIntoDelta( if (txn.readVersion < 0) { // Initialize the log path - deltaLog.createLogDirectory() + deltaLog.createLogDirectoriesIfNotExists() } val (newFiles, addFiles, deletedFiles) = (mode, replaceWhere) match { diff --git a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala index f61a6788a14..1a542d2b5dd 100644 --- a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala +++ b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala @@ -537,7 +537,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest // create a table with a default Protocol. val testSchema = spark.range(1).schema val log = DeltaLog.forTable(spark, new Path(path), fsOptions) - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() log.store.write( FileNames.unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = testSchema.json).json, Protocol(0, 0).json), @@ -563,7 +563,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest // create a table with a default Protocol. val testSchema = spark.range(1).schema val log = DeltaLog.forTable(spark, new Path(path), fsOptions) - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() log.store.write( FileNames.unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala index a5f7b416a92..e3f4cb15a07 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala @@ -686,7 +686,7 @@ trait CloneTableSuiteBase extends QueryTest val log = DeltaLog.forTable(spark, source) // make sure to have a dummy schema because we can't have empty schema table by default val newSchema = new StructType().add("id", IntegerType, nullable = true) - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() log.store.write( unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = newSchema.json).json, sourceProtocol.json), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 93ede5a355c..2444372608d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -681,17 +681,20 @@ class DeltaLogSuite extends QueryTest withTempDir { dir => val path = dir.getCanonicalPath val log = DeltaLog.forTable(spark, new Path(path)) - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() val logPath = log.logPath val fs = logPath.getFileSystem(log.newDeltaHadoopConf()) assert(fs.exists(logPath), "Log path should exist.") assert(fs.getFileStatus(logPath).isDirectory, "Log path should be a directory") + val commitPath = FileNames.commitDirPath(logPath) + assert(fs.exists(commitPath), "Commit path should exist.") + assert(fs.getFileStatus(commitPath).isDirectory, "Commit path should be a directory") } } test("DeltaLog should throw exception when unable to create log directory " + - "with filesystem IO Exception") { + "with filesystem IO Exception") { withTempDir { dir => val path = dir.getCanonicalPath val log = DeltaLog.forTable(spark, new Path(path)) @@ -702,7 +705,7 @@ class DeltaLogSuite extends QueryTest fs.create(log.logPath) val e = intercept[DeltaIOException] { - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() } checkError(e, "DELTA_CANNOT_CREATE_LOG_PATH") e.getCause match { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index fda89b3a37a..6ff64febaf8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -61,7 +61,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest path: File, schema: StructType = testTableSchema): DeltaLog = { val log = DeltaLog.forTable(spark, path) - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() log.store.write( unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = schema.json).json, protocol.json), @@ -410,7 +410,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest test("access with protocol too high") { withTempDir { path => val log = DeltaLog.forTable(spark, path) - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() log.store.write( unsafeDeltaFile(log.logPath, 0), Iterator(Metadata().json, Protocol(Integer.MAX_VALUE, Integer.MAX_VALUE).json), @@ -1228,7 +1228,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest test("create a table with no protocol") { withTempDir { path => val log = DeltaLog.forTable(spark, path) - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() log.store.write( unsafeDeltaFile(log.logPath, 0), Iterator(Metadata().json), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index f11dcd5195e..e56da1fe7b0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -48,7 +48,7 @@ class DeltaTableFeatureSuite path: File, schema: StructType = testTableSchema): DeltaLog = { val log = DeltaLog.forTable(spark, path) - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() log.store.write( unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = schema.json).json, protocol.json), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala index 44b2bfabc19..a5e4ef662b3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala @@ -386,7 +386,7 @@ trait DescribeDeltaHistorySuiteBase val writerVersion = Action.supportedProtocolVersion().minWriterVersion withTempDir { path => val log = DeltaLog.forTable(spark, path) - log.ensureLogDirectoryExist() + log.createLogDirectoriesIfNotExists() log.store.write( FileNames.unsafeDeltaFile(log.logPath, 0), Iterator( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index 27cc0bb4088..bf33b1c9f71 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -297,6 +297,37 @@ class OptimisticTransactionSuite } } + test("enabling Managed Commits on an existing table should create commit dir") { + withTempDir { tempDir => + val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath)) + val metadata = Metadata() + log.startTransaction().commit(Seq(metadata), ManualUpdate) + val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf()) + val commitDir = FileNames.commitDirPath(log.logPath) + // Delete commit directory. + fs.delete(commitDir) + assert(!fs.exists(commitDir)) + // With no Managed Commits conf, commit directory should not be created. + log.startTransaction().commit(Seq(metadata), ManualUpdate) + assert(!fs.exists(commitDir)) + // Enabling Managed Commits on an existing table should create the commit dir. + CommitOwnerProvider.registerBuilder(InMemoryCommitOwnerBuilder(3)) + val newMetadata = metadata.copy(configuration = + (metadata.configuration ++ + Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> "in-memory")).toMap) + log.startTransaction().commit(Seq(newMetadata), ManualUpdate) + assert(fs.exists(commitDir)) + log.update().ensureCommitFilesBackfilled() + // With no new Managed Commits conf, commit directory should not be created and so the + // transaction should fail because of corrupted dir. + fs.delete(commitDir) + assert(!fs.exists(commitDir)) + intercept[java.io.FileNotFoundException] { + log.startTransaction().commit(Seq(newMetadata), ManualUpdate) + } + } + } + test("AddFile with different partition schema compared to metadata should fail") { withTempDir { tempDir => val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath))