diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 0b975a41f7d..9310f7fac3e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -439,15 +439,28 @@ class DeltaLog private( def ensureLogDirectoryExist(): Unit = { val fs = logPath.getFileSystem(newDeltaHadoopConf()) def createDirIfNotExists(path: Path): Unit = { - var success = false - try { - success = fs.exists(path) || fs.mkdirs(path) + // Optimistically attempt to create the directory first without checking its existence. + // This is efficient because we're assuming it's more likely that the directory doesn't + // exist and it saves an filesystem existence check in that case. + val (success, mkdirsIOExceptionOpt) = try { + // Return value of false should mean the directory already existed (not an error) but + // we will verify below because we're paranoid about buggy FileSystem implementations. + (fs.mkdirs(path), None) } catch { + // Only needed because buggy Hadoop FileSystem.mkdir wrongly throws on existing dir. case io: IOException => - throw DeltaErrors.cannotCreateLogPathException(logPath.toString, io) + val dirExists = + try { + fs.getFileStatus(path).isDirectory + } catch { + case NonFatal(_) => false + } + (dirExists, Some(io)) } if (!success) { - throw DeltaErrors.cannotCreateLogPathException(logPath.toString) + throw DeltaErrors.cannotCreateLogPathException( + logPath = logPath.toString, + cause = mkdirsIOExceptionOpt.orNull) } } createDirIfNotExists(FileNames.commitDirPath(logPath)) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala index fe8f3676196..651ccc4e867 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala @@ -33,8 +33,6 @@ object CheckpointHook extends PostCommitHook { committedActions: Seq[Action]): Unit = { if (!txn.needsCheckpoint) return - txn.deltaLog.ensureLogDirectoryExist() - // Since the postCommitSnapshot isn't guaranteed to match committedVersion, we have to // explicitly checkpoint the snapshot at the committedVersion. val cp = postCommitSnapshot.checkpointProvider