Skip to content

Commit

Permalink
Bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed May 1, 2024
1 parent 4f39756 commit 29e90c7
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -500,14 +500,6 @@ class DeltaLog private(
createDirIfNotExists(FileNames.commitDirPath(logPath))
}

/**
* Create the log directory. Unlike `ensureLogDirectoryExist`, this method doesn't check whether
* the log directory exists and it will ignore the return value of `mkdirs`.
*/
def createLogDirectory(): Unit = {
logPath.getFileSystem(newDeltaHadoopConf()).mkdirs(logPath)
}

/* ------------ *
| Integration |
* ------------ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1412,7 +1412,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
.assignIfMissing(protocol, allActions, getFirstAttemptVersion)

if (readVersion < 0) {
deltaLog.createLogDirectory()
deltaLog.ensureLogDirectoryExist()
}
val fsWriteStartNano = System.nanoTime()
val jsonActions = allActions.map(_.json)
Expand Down Expand Up @@ -1694,6 +1694,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
logWarning("Detected no metadata in initial commit but commit validation was turned off.")
}
} else {
// Ensure Commit Directory exists if managed commits is enabled on an existing table.
newMetadata.flatMap(_.managedCommitOwnerName).foreach(_ => deltaLog.ensureLogDirectoryExist())
}

val partitionColumns = metadata.physicalPartitionSchema.fieldNames.toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ abstract class CloneTableBase(
}

if (txn.readVersion < 0) {
destinationTable.createLogDirectory()
destinationTable.ensureLogDirectoryExist()
}

val metadataToUpdate = determineTargetMetadata(txn.snapshot, deltaOperation.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ case class WriteIntoDelta(

if (txn.readVersion < 0) {
// Initialize the log path
deltaLog.createLogDirectory()
deltaLog.ensureLogDirectoryExist()
}

val (newFiles, addFiles, deletedFiles) = (mode, replaceWhere) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,14 @@ class DeltaLogSuite extends QueryTest
val fs = logPath.getFileSystem(log.newDeltaHadoopConf())
assert(fs.exists(logPath), "Log path should exist.")
assert(fs.getFileStatus(logPath).isDirectory, "Log path should be a directory")
val commitPath = FileNames.commitDirPath(logPath)
assert(fs.exists(commitPath), "Commit path should exist.")
assert(fs.getFileStatus(logPath).isDirectory, "Commit path should be a directory")
}
}

test("DeltaLog should throw exception when unable to create log directory " +
"with filesystem IO Exception") {
"with filesystem IO Exception") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val log = DeltaLog.forTable(spark, new Path(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,29 @@ class OptimisticTransactionSuite
}
}

test("enabling Managed Commits on an existing table should create commit dir") {
withTempDir { tempDir =>
val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath))
val metadata = Metadata()
log.startTransaction().commit(Seq(metadata), ManualUpdate)
val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf())
val commitDir = FileNames.commitDirPath(log.logPath)
// Delete commit directory.
fs.delete(commitDir)
assert(!fs.exists(commitDir))
// With no Managed Commits conf, commit directory should not be created.
log.startTransaction().commit(Seq(metadata), ManualUpdate)
assert(!fs.exists(commitDir))
// Enabling Managed Commits on an existing table should create the commit dir.
CommitOwnerProvider.registerBuilder(InMemoryCommitOwnerBuilder(3))
val newMetadata = metadata.copy(configuration =
(metadata.configuration ++
Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> "in-memory")).toMap)
log.startTransaction().commit(Seq(newMetadata), ManualUpdate)
assert(fs.exists(commitDir))
}
}

test("AddFile with different partition schema compared to metadata should fail") {
withTempDir { tempDir =>
val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath))
Expand Down

0 comments on commit 29e90c7

Please sign in to comment.