Skip to content

Commit

Permalink
Directory
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 27, 2024
1 parent b0ed777 commit 1f30a42
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
23 changes: 18 additions & 5 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,28 @@ class DeltaLog private(
def ensureLogDirectoryExist(): Unit = {
val fs = logPath.getFileSystem(newDeltaHadoopConf())
def createDirIfNotExists(path: Path): Unit = {
var success = false
try {
success = fs.exists(path) || fs.mkdirs(path)
// Optimistically attempt to create the directory first without checking its existence.
// This is efficient because we're assuming it's more likely that the directory doesn't
// exist and it saves an filesystem existence check in that case.
val (success, mkdirsIOExceptionOpt) = try {
// Return value of false should mean the directory already existed (not an error) but
// we will verify below because we're paranoid about buggy FileSystem implementations.
(fs.mkdirs(path), None)
} catch {
// Only needed because buggy Hadoop FileSystem.mkdir wrongly throws on existing dir.
case io: IOException =>
throw DeltaErrors.cannotCreateLogPathException(logPath.toString, io)
val dirExists =
try {
fs.getFileStatus(path).isDirectory
} catch {
case NonFatal(_) => false
}
(dirExists, Some(io))
}
if (!success) {
throw DeltaErrors.cannotCreateLogPathException(logPath.toString)
throw DeltaErrors.cannotCreateLogPathException(
logPath = logPath.toString,
cause = mkdirsIOExceptionOpt.orNull)
}
}
createDirIfNotExists(FileNames.commitDirPath(logPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ object CheckpointHook extends PostCommitHook {
committedActions: Seq[Action]): Unit = {
if (!txn.needsCheckpoint) return

txn.deltaLog.ensureLogDirectoryExist()

// Since the postCommitSnapshot isn't guaranteed to match committedVersion, we have to
// explicitly checkpoint the snapshot at the committedVersion.
val cp = postCommitSnapshot.checkpointProvider
Expand Down

0 comments on commit 1f30a42

Please sign in to comment.