Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Apr 8, 2024
1 parent 64c3207 commit 98634f2
Show file tree
Hide file tree
Showing 35 changed files with 136 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {
}

for (version <- minVersion to maxVersion) {
val jsonFilePath = FileNames.deltaFile(new Path(deltaLogPath), version).toString
val jsonFilePath = FileNames.unsafeDeltaFile(new Path(deltaLogPath), version).toString
DeltaSharingUtils.overrideIteratorBlock[String](
getDeltaSharingLogBlockId(jsonFilePath),
versionToJsonLogBuilderMap.getOrElse(version, Seq.empty).toIterator
Expand Down Expand Up @@ -778,7 +778,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {

// Always use 0.json for snapshot queries.
val deltaLogPath = s"${encodedTablePath.toString}/_delta_log"
val jsonFilePath = FileNames.deltaFile(new Path(deltaLogPath), 0).toString
val jsonFilePath = FileNames.unsafeDeltaFile(new Path(deltaLogPath), 0).toString
DeltaSharingUtils.overrideIteratorBlock[String](
getDeltaSharingLogBlockId(jsonFilePath),
jsonLogSeq.result().toIterator
Expand Down Expand Up @@ -819,7 +819,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {
val jsonLogStr = deltaSharingTableMetadata.protocol.deltaProtocol.json + "\n" +
deltaSharingTableMetadata.metadata.deltaMetadata.json + "\n"

val jsonFilePath = FileNames.deltaFile(new Path(deltaLogPath), 0).toString
val jsonFilePath = FileNames.unsafeDeltaFile(new Path(deltaLogPath), 0).toString
DeltaSharingUtils.overrideIteratorBlock[String](
getDeltaSharingLogBlockId(jsonFilePath),
Seq(jsonLogStr).toIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1917,7 +1917,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
val commitFile = util.FileNames.deltaFile(logPath, commitVersion)
val commitFile = util.FileNames.unsafeDeltaFile(logPath, commitVersion)
val commitFileStatus =
doCommit(logStore, hadoopConf, logPath, commitFile, commitVersion, actions)
// TODO(managed-commits): Integrate with ICT and pass the correct commitTimestamp
Expand Down Expand Up @@ -1985,7 +1985,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val commitTimestamp = commitResponse.commit.fileStatus.getModificationTime
val commitFile = commitResponse.commit.copy(commitTimestamp = commitTimestamp)
if (attemptVersion == 0L) {
val expectedPathForCommitZero = deltaFile(deltaLog.logPath, version = 0L).toUri
val expectedPathForCommitZero = unsafeDeltaFile(deltaLog.logPath, version = 0L).toUri
val actualCommitPath = commitResponse.commit.fileStatus.getPath.toUri
if (actualCommitPath != expectedPathForCommitZero) {
throw new IllegalStateException("Expected 0th commit to be written to " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ class Snapshot(
startVersion = minUnbackfilledVersion,
endVersion = Some(version))
val fs = deltaLog.logPath.getFileSystem(hadoopConf)
val expectedBackfilledDeltaFile = FileNames.deltaFile(deltaLog.logPath, version)
val expectedBackfilledDeltaFile = FileNames.unsafeDeltaFile(deltaLog.logPath, version)
if (!fs.exists(expectedBackfilledDeltaFile)) {
throw new IllegalStateException("Backfilling of commit files failed. " +
s"Expected delta file $expectedBackfilledDeltaFile not found.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ trait SnapshotManagement { self: DeltaLog =>

if (headDeltaVersion != checkpointVersion + 1) {
throw DeltaErrors.logFileNotFoundException(
deltaFile(logPath, checkpointVersion + 1),
unsafeDeltaFile(logPath, checkpointVersion + 1),
lastDeltaVersion,
unsafeVolatileMetadata) // metadata is best-effort only
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {
if (commitVersion == 0 || batchSize <= 1) {
// Always backfill zeroth commit or when batch size is configured as 1
backfill(logStore, hadoopConf, logPath, commitVersion, fileStatus)
val targetFile = FileNames.deltaFile(logPath, commitVersion)
val targetFile = FileNames.unsafeDeltaFile(logPath, commitVersion)
val targetFileStatus = fs.getFileStatus(targetFile)
val newCommit = commitResponse.commit.copy(fileStatus = targetFileStatus)
commitResponse = commitResponse.copy(commit = newCommit)
Expand Down Expand Up @@ -127,7 +127,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {
logPath: Path,
version: Long,
fileStatus: FileStatus): Unit = {
val targetFile = FileNames.deltaFile(logPath, version)
val targetFile = FileNames.unsafeDeltaFile(logPath, version)
logInfo(s"Backfilling commit ${fileStatus.getPath} to ${targetFile.toString}")
val commitContentIterator = logStore.readAsIterator(fileStatus, hadoopConf)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class DeltaCommitFileProvider(
}
uuids.get(version) match {
case Some(uuid) => FileNames.unbackfilledDeltaFile(resolvedPath, version, Some(uuid))
case _ => FileNames.deltaFile(resolvedPath, version)
case _ => FileNames.unsafeDeltaFile(resolvedPath, version)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,26 @@ object FileNames {
private val checksumFilePattern = checksumFileRegex.pattern
private val checkpointFilePattern = checkpointFileRegex.pattern

/** Returns the delta (json format) path for a given delta file. */
def deltaFile(path: Path, version: Long): Path = new Path(path, f"$version%020d.json")
/**
* Returns the delta (json format) path for a given delta file.
* WARNING: This API is unsafe and can resolve to incorrect paths if the table has
* Managed Commits.
* Use DeltaCommitFileProvider(snapshot).deltaFile instead to guarantee accurate paths.
*/
def unsafeDeltaFile(path: Path, version: Long): Path = new Path(path, f"$version%020d.json")

/**
* Returns the delta (json format) path for a given delta file.
* WARNING: This API is unsafe and deprecated. It can resolve to incorrect paths if the table has
* Managed Commits. It will be removed in future versions.
* Use DeltaCommitFileProvider(snapshot).deltaFile instead to guarantee accurate paths or
* unsafeDeltaFile for potentially incorrect file name resolution.
*/
@deprecated(
"This method is deprecated and will be removed in future versions. " +
"Use DeltaCommitFileProvider(snapshot).deltaFile or unsafeDeltaFile instead",
"15.1")
def deltaFile(path: Path, version: Long): Path = unsafeDeltaFile(path, version)

/**
* Returns the un-backfilled uuid formatted delta (json format) path for a given version.
Expand Down
6 changes: 3 additions & 3 deletions spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
val time = format.parse(desiredTime).getTime

val logPath = new Path(dir.getCanonicalPath, "_delta_log")
val file = new File(FileNames.deltaFile(logPath, 0).toString)
val file = new File(FileNames.unsafeDeltaFile(logPath, 0).toString)
assert(file.setLastModified(time))

val deltaTable2 = io.delta.tables.DeltaTable.forPath(spark, path, fsOptions)
Expand All @@ -539,7 +539,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
val log = DeltaLog.forTable(spark, new Path(path), fsOptions)
log.ensureLogDirectoryExist()
log.store.write(
FileNames.deltaFile(log.logPath, 0),
FileNames.unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = testSchema.json).json, Protocol(0, 0).json),
overwrite = false,
log.newDeltaHadoopConf())
Expand All @@ -565,7 +565,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
val log = DeltaLog.forTable(spark, new Path(path), fsOptions)
log.ensureLogDirectoryExist()
log.store.write(
FileNames.deltaFile(log.logPath, 0),
FileNames.unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json),
overwrite = false,
log.newDeltaHadoopConf())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
val version = deltaTable.startTransactionWithInitialSnapshot()
.commit(domainMetadatas, ManualUpdate)
val committedActions = deltaLog.store.read(
FileNames.deltaFile(deltaLog.logPath, version),
FileNames.unsafeDeltaFile(deltaLog.logPath, version),
deltaLog.newDeltaHadoopConf())
assert(committedActions.size == 2)
val serializedJson = committedActions.last
Expand Down Expand Up @@ -635,7 +635,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
val version = deltaLog.startTransaction().commit(Seq(action), ManualUpdate)
// Read the commit file and get the serialized committed actions
val committedActions = deltaLog.store.read(
FileNames.deltaFile(deltaLog.logPath, version),
FileNames.unsafeDeltaFile(deltaLog.logPath, version),
deltaLog.newDeltaHadoopConf())

assert(committedActions.size == 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ class CheckpointsSuite
// Delete the commit files 0-9, so that we are forced to read the checkpoint file
val logPath = new Path(new File(target, "_delta_log").getAbsolutePath)
for (i <- 0 to 10) {
val file = new File(FileNames.deltaFile(logPath, version = i).toString)
val file = new File(FileNames.unsafeDeltaFile(logPath, version = i).toString)
file.delete()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
import org.apache.spark.sql.delta.util.FileNames.{checksumFile, deltaFile}
import org.apache.spark.sql.delta.util.FileNames.{checksumFile, unsafeDeltaFile}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
Expand Down Expand Up @@ -231,7 +231,7 @@ trait CloneTableSuiteBase extends QueryTest
targetLocation.isEmpty && targetIsTable,
isReplaceOperation)

val commit = deltaFile(targetLog.logPath, targetLog.unsafeVolatileSnapshot.version)
val commit = unsafeDeltaFile(targetLog.logPath, targetLog.unsafeVolatileSnapshot.version)
val hadoopConf = targetLog.newDeltaHadoopConf()
val filePaths: Seq[FileAction] = targetLog.store.read(commit, hadoopConf).flatMap { line =>
JsonUtils.fromJson[SingleAction](line) match {
Expand Down Expand Up @@ -688,7 +688,7 @@ trait CloneTableSuiteBase extends QueryTest
val newSchema = new StructType().add("id", IntegerType, nullable = true)
log.ensureLogDirectoryExist()
log.store.write(
deltaFile(log.logPath, 0),
unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = newSchema.json).json, sourceProtocol.json),
overwrite = false,
log.newDeltaHadoopConf())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest

/** Modify timestamp for a delta commit, used to test timestamp querying */
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
val file = new File(FileNames.deltaFile(deltaLog.logPath, version).toUri)
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
file.setLastModified(time)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ abstract class DeltaCDCSuiteBase

/** Modify timestamp for a delta commit, used to test timestamp querying */
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
val file = new File(FileNames.deltaFile(deltaLog.logPath, version).toUri)
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
file.setLastModified(time)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
Expand Down Expand Up @@ -951,7 +951,7 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase {
val deltaLog = DeltaLog.forTable(spark, path)
(0 to 3).foreach { i =>
spark.range(i * 10, (i + 1) * 10).write.format("delta").mode("append").save(path)
val file = new File(FileNames.deltaFile(deltaLog.logPath, i).toUri)
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, i).toUri)
file.setLastModified(300 - i)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ trait DeltaTimeTravelTests extends QueryTest
protected val timeFormatter = new SimpleDateFormat("yyyyMMddHHmmssSSS")

protected def modifyCommitTimestamp(deltaLog: DeltaLog, version: Long, ts: Long): Unit = {
val file = new File(FileNames.deltaFile(deltaLog.logPath, version).toUri)
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
file.setLastModified(ts)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
Expand Down Expand Up @@ -128,7 +128,7 @@ trait DeltaTimeTravelTests extends QueryTest
.saveAsTable(table)
}
val deltaLog = DeltaLog.forTable(spark, new TableIdentifier(table))
val file = new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri)
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri)
file.setLastModified(commitList.head)
commitList = commits.slice(1, commits.length) // we already wrote the first commit here
var startVersion = deltaLog.snapshot.version + 1
Expand Down Expand Up @@ -352,7 +352,7 @@ trait DeltaTimeTravelTests extends QueryTest
assert(e1.getMessage.contains("[0, 2]"))

val deltaLog = DeltaLog.forTable(spark, getTableLocation(tblName))
new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri).delete()
new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri).delete()
// Delta Lake will create a DeltaTableV2 explicitly with time travel options in the catalog.
// These options will be verified by DeltaHistoryManager, which will throw an
// AnalysisException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest
val hadoopConf = deltaLog.newDeltaHadoopConf()

(startVersion to endVersion).foreach { versionToRead =>
val file = FileNames.deltaFile(deltaLog.logPath, versionToRead)
val file = FileNames.unsafeDeltaFile(deltaLog.logPath, versionToRead)
val actionsIterator = deltaLog.store.readAsIterator(file, hadoopConf).map(Action.fromJson)
logReplay.append(versionToRead, actionsIterator)
}
Expand Down Expand Up @@ -429,7 +429,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest
postSetupFunc = Some(
(deltaLog: DeltaLog) => {
val logPath = deltaLog.logPath
val deltaFileToDelete = FileNames.deltaFile(logPath, version = 4)
val deltaFileToDelete = FileNames.unsafeDeltaFile(logPath, version = 4)
logPath.getFileSystem(deltaLog.newDeltaHadoopConf()).delete(deltaFileToDelete, true)
}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ class DeltaLogSuite extends QueryTest
s"$scheme$path", Some(200L), dataChange = false)

log.store.write(
FileNames.deltaFile(log.logPath, 0L),
FileNames.unsafeDeltaFile(log.logPath, 0L),
Iterator(Action.supportedProtocolVersion(), Metadata(), add)
.map(a => JsonUtils.toJson(a.wrap)),
overwrite = false,
log.newDeltaHadoopConf())
log.store.write(
FileNames.deltaFile(log.logPath, 1L),
FileNames.unsafeDeltaFile(log.logPath, 1L),
Iterator(JsonUtils.toJson(rm.wrap)),
overwrite = false,
log.newDeltaHadoopConf())
Expand All @@ -272,13 +272,13 @@ class DeltaLogSuite extends QueryTest
s"$scheme$path", Some(200L), dataChange = false)

log.store.write(
FileNames.deltaFile(log.logPath, 0L),
FileNames.unsafeDeltaFile(log.logPath, 0L),
Iterator(Action.supportedProtocolVersion(), Metadata(), add)
.map(a => JsonUtils.toJson(a.wrap)),
overwrite = false,
log.newDeltaHadoopConf())
log.store.write(
FileNames.deltaFile(log.logPath, 1L),
FileNames.unsafeDeltaFile(log.logPath, 1L),
Iterator(JsonUtils.toJson(rm.wrap)),
overwrite = false,
log.newDeltaHadoopConf())
Expand Down Expand Up @@ -373,7 +373,7 @@ class DeltaLogSuite extends QueryTest
}
val file = AddFile("abc", Map.empty, 1, 1, true)
log.store.write(
FileNames.deltaFile(log.logPath, 0L),
FileNames.unsafeDeltaFile(log.logPath, 0L),
Iterator(selectedAction, file).map(a => JsonUtils.toJson(a.wrap)),
overwrite = false,
log.newDeltaHadoopConf())
Expand Down Expand Up @@ -526,14 +526,14 @@ class DeltaLogSuite extends QueryTest
assert(deltaLog.snapshot.version === 0)

deltaLog.store.write(
FileNames.deltaFile(deltaLog.logPath, 0),
FileNames.unsafeDeltaFile(deltaLog.logPath, 0),
actions.map(_.unwrap.json).iterator,
overwrite = false,
deltaLog.newDeltaHadoopConf())

// To avoid flakiness, we manually set the modification timestamp of the file to a later
// second
new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri)
new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri)
.setLastModified(commitTimestamp + 5000)

checkAnswer(
Expand All @@ -558,7 +558,7 @@ class DeltaLogSuite extends QueryTest
// that we don't trigger another update, and thus don't find the commit.
val add = AddFile(path, Map.empty, 100L, 10L, dataChange = true)
deltaLog.store.write(
FileNames.deltaFile(deltaLog.logPath, 1L),
FileNames.unsafeDeltaFile(deltaLog.logPath, 1L),
Iterator(JsonUtils.toJson(add.wrap)),
overwrite = false,
deltaLog.newDeltaHadoopConf())
Expand All @@ -582,7 +582,7 @@ class DeltaLogSuite extends QueryTest
spark.range(1).write.format("delta").mode("append").save(path)

val log = DeltaLog.forTable(spark, path)
val commitFilePath = FileNames.deltaFile(log.logPath, 1L)
val commitFilePath = FileNames.unsafeDeltaFile(log.logPath, 1L)
val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf())
val stream = fs.open(commitFilePath)
val reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DeltaOptionSuite extends QueryTest
val deltaLog = DeltaLog.forTable(spark, tempDir)
val version = deltaLog.snapshot.version
val commitActions = deltaLog.store
.read(FileNames.deltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf())
.read(FileNames.unsafeDeltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf())
.map(Action.fromJson)
val fileActions = commitActions.collect { case a: FileAction => a }

Expand All @@ -88,7 +88,7 @@ class DeltaOptionSuite extends QueryTest
val deltaLog = DeltaLog.forTable(spark, tempDir)
val version = deltaLog.snapshot.version
val commitActions = deltaLog.store
.read(FileNames.deltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf())
.read(FileNames.unsafeDeltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf())
.map(Action.fromJson)
val fileActions = commitActions.collect { case a: FileAction => a }

Expand Down
Loading

0 comments on commit 98634f2

Please sign in to comment.