Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed May 7, 2024
1 parent 58844ab commit fecfce0
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 26 deletions.
12 changes: 2 additions & 10 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ class DeltaLog private(

def isSameLogAs(otherLog: DeltaLog): Boolean = this.compositeId == otherLog.compositeId

/** Creates the log directory if it does not exist. */
def ensureLogDirectoryExist(): Unit = {
/** Creates the log directory and commit directory if it does not exist. */
def createLogDirectoriesIfNotExists(): Unit = {
val fs = logPath.getFileSystem(newDeltaHadoopConf())
def createDirIfNotExists(path: Path): Unit = {
// Optimistically attempt to create the directory first without checking its existence.
Expand Down Expand Up @@ -500,14 +500,6 @@ class DeltaLog private(
createDirIfNotExists(FileNames.commitDirPath(logPath))
}

/**
* Create the log directory. Unlike `ensureLogDirectoryExist`, this method doesn't check whether
* the log directory exists and it will ignore the return value of `mkdirs`.
*/
def createLogDirectory(): Unit = {
logPath.getFileSystem(newDeltaHadoopConf()).mkdirs(logPath)
}

/* ------------ *
| Integration |
* ------------ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
.assignIfMissing(protocol, allActions, getFirstAttemptVersion)

if (readVersion < 0) {
deltaLog.createLogDirectory()
deltaLog.createLogDirectoriesIfNotExists()
}
val fsWriteStartNano = System.nanoTime()
val jsonActions = allActions.map(_.json)
Expand Down Expand Up @@ -1681,8 +1681,15 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// `assertMetadata` call above.
performCdcColumnMappingCheck(finalActions, op)

// Ensure Commit Directory exists when managed commits is enabled on an existing table.
lazy val isFsToMcConversion = snapshot.metadata.managedCommitOwnerName.isEmpty &&
newMetadata.flatMap(_.managedCommitOwnerName).nonEmpty
val shouldCreateLogDirs = snapshot.version == -1 || isFsToMcConversion
if (shouldCreateLogDirs) {
deltaLog.createLogDirectoriesIfNotExists()
}

if (snapshot.version == -1) {
deltaLog.ensureLogDirectoryExist()
// If this is the first commit and no protocol is specified, initialize the protocol version.
if (!finalActions.exists(_.isInstanceOf[Protocol])) {
finalActions = protocol +: finalActions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ abstract class CloneTableBase(
}

if (txn.readVersion < 0) {
destinationTable.createLogDirectory()
destinationTable.createLogDirectoriesIfNotExists()
}

val metadataToUpdate = determineTargetMetadata(txn.snapshot, deltaOperation.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ abstract class ConvertToDeltaCommandBase(
convertProperties: ConvertTarget,
targetTable: ConvertTargetTable): Seq[Row] =
recordDeltaOperation(txn.deltaLog, "delta.convert") {
txn.deltaLog.ensureLogDirectoryExist()
txn.deltaLog.createLogDirectoriesIfNotExists()
val targetPath = new Path(convertProperties.targetDir)
// scalastyle:off deltahadoopconfiguration
val sessionHadoopConf = spark.sessionState.newHadoopConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ case class WriteIntoDelta(

if (txn.readVersion < 0) {
// Initialize the log path
deltaLog.createLogDirectory()
deltaLog.createLogDirectoriesIfNotExists()
}

val (newFiles, addFiles, deletedFiles) = (mode, replaceWhere) match {
Expand Down
4 changes: 2 additions & 2 deletions spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
// create a table with a default Protocol.
val testSchema = spark.range(1).schema
val log = DeltaLog.forTable(spark, new Path(path), fsOptions)
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()
log.store.write(
FileNames.unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = testSchema.json).json, Protocol(0, 0).json),
Expand All @@ -563,7 +563,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
// create a table with a default Protocol.
val testSchema = spark.range(1).schema
val log = DeltaLog.forTable(spark, new Path(path), fsOptions)
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()
log.store.write(
FileNames.unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ trait CloneTableSuiteBase extends QueryTest
val log = DeltaLog.forTable(spark, source)
// make sure to have a dummy schema because we can't have empty schema table by default
val newSchema = new StructType().add("id", IntegerType, nullable = true)
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()
log.store.write(
unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = newSchema.json).json, sourceProtocol.json),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,17 +681,20 @@ class DeltaLogSuite extends QueryTest
withTempDir { dir =>
val path = dir.getCanonicalPath
val log = DeltaLog.forTable(spark, new Path(path))
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()

val logPath = log.logPath
val fs = logPath.getFileSystem(log.newDeltaHadoopConf())
assert(fs.exists(logPath), "Log path should exist.")
assert(fs.getFileStatus(logPath).isDirectory, "Log path should be a directory")
val commitPath = FileNames.commitDirPath(logPath)
assert(fs.exists(commitPath), "Commit path should exist.")
assert(fs.getFileStatus(commitPath).isDirectory, "Commit path should be a directory")
}
}

test("DeltaLog should throw exception when unable to create log directory " +
"with filesystem IO Exception") {
"with filesystem IO Exception") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val log = DeltaLog.forTable(spark, new Path(path))
Expand All @@ -702,7 +705,7 @@ class DeltaLogSuite extends QueryTest
fs.create(log.logPath)

val e = intercept[DeltaIOException] {
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()
}
checkError(e, "DELTA_CANNOT_CREATE_LOG_PATH")
e.getCause match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
path: File,
schema: StructType = testTableSchema): DeltaLog = {
val log = DeltaLog.forTable(spark, path)
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()
log.store.write(
unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = schema.json).json, protocol.json),
Expand Down Expand Up @@ -410,7 +410,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
test("access with protocol too high") {
withTempDir { path =>
val log = DeltaLog.forTable(spark, path)
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()
log.store.write(
unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata().json, Protocol(Integer.MAX_VALUE, Integer.MAX_VALUE).json),
Expand Down Expand Up @@ -1228,7 +1228,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
test("create a table with no protocol") {
withTempDir { path =>
val log = DeltaLog.forTable(spark, path)
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()
log.store.write(
unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata().json),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DeltaTableFeatureSuite
path: File,
schema: StructType = testTableSchema): DeltaLog = {
val log = DeltaLog.forTable(spark, path)
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()
log.store.write(
unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = schema.json).json, protocol.json),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ trait DescribeDeltaHistorySuiteBase
val writerVersion = Action.supportedProtocolVersion().minWriterVersion
withTempDir { path =>
val log = DeltaLog.forTable(spark, path)
log.ensureLogDirectoryExist()
log.createLogDirectoriesIfNotExists()
log.store.write(
FileNames.unsafeDeltaFile(log.logPath, 0),
Iterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,37 @@ class OptimisticTransactionSuite
}
}

test("enabling Managed Commits on an existing table should create commit dir") {
withTempDir { tempDir =>
val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath))
val metadata = Metadata()
log.startTransaction().commit(Seq(metadata), ManualUpdate)
val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf())
val commitDir = FileNames.commitDirPath(log.logPath)
// Delete commit directory.
fs.delete(commitDir)
assert(!fs.exists(commitDir))
// With no Managed Commits conf, commit directory should not be created.
log.startTransaction().commit(Seq(metadata), ManualUpdate)
assert(!fs.exists(commitDir))
// Enabling Managed Commits on an existing table should create the commit dir.
CommitOwnerProvider.registerBuilder(InMemoryCommitOwnerBuilder(3))
val newMetadata = metadata.copy(configuration =
(metadata.configuration ++
Map(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key -> "in-memory")).toMap)
log.startTransaction().commit(Seq(newMetadata), ManualUpdate)
assert(fs.exists(commitDir))
log.update().ensureCommitFilesBackfilled()
// With no new Managed Commits conf, commit directory should not be created and so the
// transaction should fail because of corrupted dir.
fs.delete(commitDir)
assert(!fs.exists(commitDir))
intercept[java.io.FileNotFoundException] {
log.startTransaction().commit(Seq(newMetadata), ManualUpdate)
}
}
}

test("AddFile with different partition schema compared to metadata should fail") {
withTempDir { tempDir =>
val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath))
Expand Down

0 comments on commit fecfce0

Please sign in to comment.