diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala index 9010e9487d2..be7d8d1c76d 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala @@ -651,7 +651,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging { } for (version <- minVersion to maxVersion) { - val jsonFilePath = FileNames.deltaFile(new Path(deltaLogPath), version).toString + val jsonFilePath = FileNames.unsafeDeltaFile(new Path(deltaLogPath), version).toString DeltaSharingUtils.overrideIteratorBlock[String]( getDeltaSharingLogBlockId(jsonFilePath), versionToJsonLogBuilderMap.getOrElse(version, Seq.empty).toIterator @@ -778,7 +778,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging { // Always use 0.json for snapshot queries. val deltaLogPath = s"${encodedTablePath.toString}/_delta_log" - val jsonFilePath = FileNames.deltaFile(new Path(deltaLogPath), 0).toString + val jsonFilePath = FileNames.unsafeDeltaFile(new Path(deltaLogPath), 0).toString DeltaSharingUtils.overrideIteratorBlock[String]( getDeltaSharingLogBlockId(jsonFilePath), jsonLogSeq.result().toIterator @@ -819,7 +819,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging { val jsonLogStr = deltaSharingTableMetadata.protocol.deltaProtocol.json + "\n" + deltaSharingTableMetadata.metadata.deltaMetadata.json + "\n" - val jsonFilePath = FileNames.deltaFile(new Path(deltaLogPath), 0).toString + val jsonFilePath = FileNames.unsafeDeltaFile(new Path(deltaLogPath), 0).toString DeltaSharingUtils.overrideIteratorBlock[String]( getDeltaSharingLogBlockId(jsonFilePath), Seq(jsonLogStr).toIterator diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 70bb21adc5d..b8b9dc26d27 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1917,7 +1917,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite commitVersion: Long, actions: Iterator[String], updatedActions: UpdatedActions): CommitResponse = { - val commitFile = util.FileNames.deltaFile(logPath, commitVersion) + val commitFile = util.FileNames.unsafeDeltaFile(logPath, commitVersion) val commitFileStatus = doCommit(logStore, hadoopConf, logPath, commitFile, commitVersion, actions) // TODO(managed-commits): Integrate with ICT and pass the correct commitTimestamp @@ -1985,7 +1985,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite val commitTimestamp = commitResponse.commit.fileStatus.getModificationTime val commitFile = commitResponse.commit.copy(commitTimestamp = commitTimestamp) if (attemptVersion == 0L) { - val expectedPathForCommitZero = deltaFile(deltaLog.logPath, version = 0L).toUri + val expectedPathForCommitZero = unsafeDeltaFile(deltaLog.logPath, version = 0L).toUri val actualCommitPath = commitResponse.commit.fileStatus.getPath.toUri if (actualCommitPath != expectedPathForCommitZero) { throw new IllegalStateException("Expected 0th commit to be written to " + diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index f92666223a2..20b4eb96fa5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -491,7 +491,7 @@ class Snapshot( startVersion = minUnbackfilledVersion, endVersion = Some(version)) val fs = deltaLog.logPath.getFileSystem(hadoopConf) - val expectedBackfilledDeltaFile = FileNames.deltaFile(deltaLog.logPath, version) + val expectedBackfilledDeltaFile = FileNames.unsafeDeltaFile(deltaLog.logPath, version) if (!fs.exists(expectedBackfilledDeltaFile)) { throw new IllegalStateException("Backfilling of commit files failed. " + s"Expected delta file $expectedBackfilledDeltaFile not found.") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 7efafbd31a6..966466d5397 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -371,7 +371,7 @@ trait SnapshotManagement { self: DeltaLog => if (headDeltaVersion != checkpointVersion + 1) { throw DeltaErrors.logFileNotFoundException( - deltaFile(logPath, checkpointVersion + 1), + unsafeDeltaFile(logPath, checkpointVersion + 1), lastDeltaVersion, unsafeVolatileMetadata) // metadata is best-effort only } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala index a995123f411..ac4ccaf6a02 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala @@ -80,7 +80,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { if (commitVersion == 0 || batchSize <= 1) { // Always backfill zeroth commit or when batch size is configured as 1 backfill(logStore, hadoopConf, logPath, commitVersion, fileStatus) - val targetFile = FileNames.deltaFile(logPath, commitVersion) + val targetFile = FileNames.unsafeDeltaFile(logPath, commitVersion) val targetFileStatus = fs.getFileStatus(targetFile) val newCommit = commitResponse.commit.copy(fileStatus = targetFileStatus) commitResponse = commitResponse.copy(commit = newCommit) @@ -127,7 +127,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { logPath: Path, version: Long, fileStatus: FileStatus): Unit = { - val targetFile = FileNames.deltaFile(logPath, version) + val targetFile = FileNames.unsafeDeltaFile(logPath, version) logInfo(s"Backfilling commit ${fileStatus.getPath} to ${targetFile.toString}") val commitContentIterator = logStore.readAsIterator(fileStatus, hadoopConf) try { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala index 4fc9e6ea9c9..815a24b67d7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaCommitFileProvider.scala @@ -56,7 +56,7 @@ case class DeltaCommitFileProvider( } uuids.get(version) match { case Some(uuid) => FileNames.unbackfilledDeltaFile(resolvedPath, version, Some(uuid)) - case _ => FileNames.deltaFile(resolvedPath, version) + case _ => FileNames.unsafeDeltaFile(resolvedPath, version) } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala index d10ed15a1b8..84b687be32c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala @@ -34,8 +34,26 @@ object FileNames { private val checksumFilePattern = checksumFileRegex.pattern private val checkpointFilePattern = checkpointFileRegex.pattern - /** Returns the delta (json format) path for a given delta file. */ - def deltaFile(path: Path, version: Long): Path = new Path(path, f"$version%020d.json") + /** + * Returns the delta (json format) path for a given delta file. + * WARNING: This API is unsafe and can resolve to incorrect paths if the table has + * Managed Commits. + * Use DeltaCommitFileProvider(snapshot).deltaFile instead to guarantee accurate paths. + */ + def unsafeDeltaFile(path: Path, version: Long): Path = new Path(path, f"$version%020d.json") + + /** + * Returns the delta (json format) path for a given delta file. + * WARNING: This API is unsafe and deprecated. It can resolve to incorrect paths if the table has + * Managed Commits. It will be removed in future versions. + * Use DeltaCommitFileProvider(snapshot).deltaFile instead to guarantee accurate paths or + * unsafeDeltaFile for potentially incorrect file name resolution. + */ + @deprecated( + "This method is deprecated and will be removed in future versions. " + + "Use DeltaCommitFileProvider(snapshot).deltaFile or unsafeDeltaFile instead", + "15.1") + def deltaFile(path: Path, version: Long): Path = unsafeDeltaFile(path, version) /** * Returns the un-backfilled uuid formatted delta (json format) path for a given version. diff --git a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala index 07e1cc28339..d43f83475c2 100644 --- a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala +++ b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala @@ -515,7 +515,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest val time = format.parse(desiredTime).getTime val logPath = new Path(dir.getCanonicalPath, "_delta_log") - val file = new File(FileNames.deltaFile(logPath, 0).toString) + val file = new File(FileNames.unsafeDeltaFile(logPath, 0).toString) assert(file.setLastModified(time)) val deltaTable2 = io.delta.tables.DeltaTable.forPath(spark, path, fsOptions) @@ -539,7 +539,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest val log = DeltaLog.forTable(spark, new Path(path), fsOptions) log.ensureLogDirectoryExist() log.store.write( - FileNames.deltaFile(log.logPath, 0), + FileNames.unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = testSchema.json).json, Protocol(0, 0).json), overwrite = false, log.newDeltaHadoopConf()) @@ -565,7 +565,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest val log = DeltaLog.forTable(spark, new Path(path), fsOptions) log.ensureLogDirectoryExist() log.store.write( - FileNames.deltaFile(log.logPath, 0), + FileNames.unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json), overwrite = false, log.newDeltaHadoopConf()) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala index c252e8439c2..9604e37c550 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala @@ -409,7 +409,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta val version = deltaTable.startTransactionWithInitialSnapshot() .commit(domainMetadatas, ManualUpdate) val committedActions = deltaLog.store.read( - FileNames.deltaFile(deltaLog.logPath, version), + FileNames.unsafeDeltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf()) assert(committedActions.size == 2) val serializedJson = committedActions.last @@ -635,7 +635,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta val version = deltaLog.startTransaction().commit(Seq(action), ManualUpdate) // Read the commit file and get the serialized committed actions val committedActions = deltaLog.store.read( - FileNames.deltaFile(deltaLog.logPath, version), + FileNames.unsafeDeltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf()) assert(committedActions.size == 2) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 52ca5f7c9c9..0c8e0dd9932 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -581,7 +581,7 @@ class CheckpointsSuite // Delete the commit files 0-9, so that we are forced to read the checkpoint file val logPath = new Path(new File(target, "_delta_log").getAbsolutePath) for (i <- 0 to 10) { - val file = new File(FileNames.deltaFile(logPath, version = i).toString) + val file = new File(FileNames.unsafeDeltaFile(logPath, version = i).toString) file.delete() } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala index 742e78cb3ee..a5f7b416a92 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands._ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} -import org.apache.spark.sql.delta.util.FileNames.{checksumFile, deltaFile} +import org.apache.spark.sql.delta.util.FileNames.{checksumFile, unsafeDeltaFile} import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, RawLocalFileSystem} @@ -231,7 +231,7 @@ trait CloneTableSuiteBase extends QueryTest targetLocation.isEmpty && targetIsTable, isReplaceOperation) - val commit = deltaFile(targetLog.logPath, targetLog.unsafeVolatileSnapshot.version) + val commit = unsafeDeltaFile(targetLog.logPath, targetLog.unsafeVolatileSnapshot.version) val hadoopConf = targetLog.newDeltaHadoopConf() val filePaths: Seq[FileAction] = targetLog.store.read(commit, hadoopConf).flatMap { line => JsonUtils.fromJson[SingleAction](line) match { @@ -688,7 +688,7 @@ trait CloneTableSuiteBase extends QueryTest val newSchema = new StructType().add("id", IntegerType, nullable = true) log.ensureLogDirectoryExist() log.store.write( - deltaFile(log.logPath, 0), + unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = newSchema.json).json, sourceProtocol.json), overwrite = false, log.newDeltaHadoopConf()) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala index fa262cd22bf..d432eee2f45 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala @@ -50,7 +50,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest /** Modify timestamp for a delta commit, used to test timestamp querying */ def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = { - val file = new File(FileNames.deltaFile(deltaLog.logPath, version).toUri) + val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) file.setLastModified(time) val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) if (crc.exists()) { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala index 758fa73ab7d..157d1595317 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -107,7 +107,7 @@ abstract class DeltaCDCSuiteBase /** Modify timestamp for a delta commit, used to test timestamp querying */ def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = { - val file = new File(FileNames.deltaFile(deltaLog.logPath, version).toUri) + val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) file.setLastModified(time) val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) if (crc.exists()) { @@ -951,7 +951,7 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase { val deltaLog = DeltaLog.forTable(spark, path) (0 to 3).foreach { i => spark.range(i * 10, (i + 1) * 10).write.format("delta").mode("append").save(path) - val file = new File(FileNames.deltaFile(deltaLog.logPath, i).toUri) + val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, i).toUri) file.setLastModified(300 - i) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala index 2da3ff29533..afbc5585777 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala @@ -63,7 +63,7 @@ trait DeltaTimeTravelTests extends QueryTest protected val timeFormatter = new SimpleDateFormat("yyyyMMddHHmmssSSS") protected def modifyCommitTimestamp(deltaLog: DeltaLog, version: Long, ts: Long): Unit = { - val file = new File(FileNames.deltaFile(deltaLog.logPath, version).toUri) + val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) file.setLastModified(ts) val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) if (crc.exists()) { @@ -127,7 +127,7 @@ trait DeltaTimeTravelTests extends QueryTest .saveAsTable(table) } val deltaLog = DeltaLog.forTable(spark, new TableIdentifier(table)) - val file = new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri) + val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri) file.setLastModified(commitList.head) commitList = commits.slice(1, commits.length) // we already wrote the first commit here var startVersion = deltaLog.snapshot.version + 1 @@ -351,7 +351,7 @@ trait DeltaTimeTravelTests extends QueryTest assert(e1.getMessage.contains("[0, 2]")) val deltaLog = DeltaLog.forTable(spark, getTableLocation(tblName)) - new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri).delete() + new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri).delete() // Delta Lake will create a DeltaTableV2 explicitly with time travel options in the catalog. // These options will be verified by DeltaHistoryManager, which will throw an // AnalysisException. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala index 5a341445ceb..a705cd4da30 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala @@ -56,7 +56,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest val hadoopConf = deltaLog.newDeltaHadoopConf() (startVersion to endVersion).foreach { versionToRead => - val file = FileNames.deltaFile(deltaLog.logPath, versionToRead) + val file = FileNames.unsafeDeltaFile(deltaLog.logPath, versionToRead) val actionsIterator = deltaLog.store.readAsIterator(file, hadoopConf).map(Action.fromJson) logReplay.append(versionToRead, actionsIterator) } @@ -429,7 +429,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest postSetupFunc = Some( (deltaLog: DeltaLog) => { val logPath = deltaLog.logPath - val deltaFileToDelete = FileNames.deltaFile(logPath, version = 4) + val deltaFileToDelete = FileNames.unsafeDeltaFile(logPath, version = 4) logPath.getFileSystem(deltaLog.newDeltaHadoopConf()).delete(deltaFileToDelete, true) } ), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 3848abd6fe4..7690b517f11 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -243,13 +243,13 @@ class DeltaLogSuite extends QueryTest s"$scheme$path", Some(200L), dataChange = false) log.store.write( - FileNames.deltaFile(log.logPath, 0L), + FileNames.unsafeDeltaFile(log.logPath, 0L), Iterator(Action.supportedProtocolVersion(), Metadata(), add) .map(a => JsonUtils.toJson(a.wrap)), overwrite = false, log.newDeltaHadoopConf()) log.store.write( - FileNames.deltaFile(log.logPath, 1L), + FileNames.unsafeDeltaFile(log.logPath, 1L), Iterator(JsonUtils.toJson(rm.wrap)), overwrite = false, log.newDeltaHadoopConf()) @@ -272,13 +272,13 @@ class DeltaLogSuite extends QueryTest s"$scheme$path", Some(200L), dataChange = false) log.store.write( - FileNames.deltaFile(log.logPath, 0L), + FileNames.unsafeDeltaFile(log.logPath, 0L), Iterator(Action.supportedProtocolVersion(), Metadata(), add) .map(a => JsonUtils.toJson(a.wrap)), overwrite = false, log.newDeltaHadoopConf()) log.store.write( - FileNames.deltaFile(log.logPath, 1L), + FileNames.unsafeDeltaFile(log.logPath, 1L), Iterator(JsonUtils.toJson(rm.wrap)), overwrite = false, log.newDeltaHadoopConf()) @@ -372,7 +372,7 @@ class DeltaLogSuite extends QueryTest } val file = AddFile("abc", Map.empty, 1, 1, true) log.store.write( - FileNames.deltaFile(log.logPath, 0L), + FileNames.unsafeDeltaFile(log.logPath, 0L), Iterator(selectedAction, file).map(a => JsonUtils.toJson(a.wrap)), overwrite = false, log.newDeltaHadoopConf()) @@ -525,14 +525,14 @@ class DeltaLogSuite extends QueryTest assert(deltaLog.snapshot.version === 0) deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, 0), + FileNames.unsafeDeltaFile(deltaLog.logPath, 0), actions.map(_.unwrap.json).iterator, overwrite = false, deltaLog.newDeltaHadoopConf()) // To avoid flakiness, we manually set the modification timestamp of the file to a later // second - new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri) + new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri) .setLastModified(commitTimestamp + 5000) checkAnswer( @@ -557,7 +557,7 @@ class DeltaLogSuite extends QueryTest // that we don't trigger another update, and thus don't find the commit. val add = AddFile(path, Map.empty, 100L, 10L, dataChange = true) deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, 1L), + FileNames.unsafeDeltaFile(deltaLog.logPath, 1L), Iterator(JsonUtils.toJson(add.wrap)), overwrite = false, deltaLog.newDeltaHadoopConf()) @@ -581,7 +581,7 @@ class DeltaLogSuite extends QueryTest spark.range(1).write.format("delta").mode("append").save(path) val log = DeltaLog.forTable(spark, path) - val commitFilePath = FileNames.deltaFile(log.logPath, 1L) + val commitFilePath = FileNames.unsafeDeltaFile(log.logPath, 1L) val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf()) val stream = fs.open(commitFilePath) val reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaOptionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaOptionSuite.scala index b6301aab13a..1c14e3dcacb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaOptionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaOptionSuite.scala @@ -62,7 +62,7 @@ class DeltaOptionSuite extends QueryTest val deltaLog = DeltaLog.forTable(spark, tempDir) val version = deltaLog.snapshot.version val commitActions = deltaLog.store - .read(FileNames.deltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf()) + .read(FileNames.unsafeDeltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf()) .map(Action.fromJson) val fileActions = commitActions.collect { case a: FileAction => a } @@ -88,7 +88,7 @@ class DeltaOptionSuite extends QueryTest val deltaLog = DeltaLog.forTable(spark, tempDir) val version = deltaLog.snapshot.version val commitActions = deltaLog.store - .read(FileNames.deltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf()) + .read(FileNames.unsafeDeltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf()) .map(Action.fromJson) val fileActions = commitActions.collect { case a: FileAction => a } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index d55276989a3..5d929eeb976 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.FileNames -import org.apache.spark.sql.delta.util.FileNames.{deltaFile, DeltaFile} +import org.apache.spark.sql.delta.util.FileNames.{unsafeDeltaFile, DeltaFile} import org.apache.spark.sql.delta.util.JsonUtils import org.apache.spark.SparkConf @@ -63,7 +63,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val log = DeltaLog.forTable(spark, path) log.ensureLogDirectoryExist() log.store.write( - deltaFile(log.logPath, 0), + unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = schema.json).json, protocol.json), overwrite = false, log.newDeltaHadoopConf()) @@ -412,7 +412,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val log = DeltaLog.forTable(spark, path) log.ensureLogDirectoryExist() log.store.write( - deltaFile(log.logPath, 0), + unsafeDeltaFile(log.logPath, 0), Iterator(Metadata().json, Protocol(Integer.MAX_VALUE, Integer.MAX_VALUE).json), overwrite = false, log.newDeltaHadoopConf()) @@ -440,7 +440,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION).withWriterFeatures(Seq("newUnsupportedWriterFeature")) log.store.write( - deltaFile(log.logPath, snapshot.version + 1), + unsafeDeltaFile(log.logPath, snapshot.version + 1), Iterator(Metadata().json, newProtocol.json), overwrite = false, log.newDeltaHadoopConf()) @@ -608,7 +608,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest version: Long, protocol: Protocol): Unit = { log.store.write( - deltaFile(log.logPath, version), + unsafeDeltaFile(log.logPath, version), Iterator( Metadata().json, protocol.json), @@ -973,7 +973,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val txn = deltaLog.startTransaction() val currentVersion = txn.snapshot.version deltaLog.store.write( - deltaFile(deltaLog.logPath, currentVersion + 1), + unsafeDeltaFile(deltaLog.logPath, currentVersion + 1), Iterator(incompatibleProtocol.json), overwrite = false, hadoopConf) @@ -983,7 +983,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest txn.commit(AddFile("test", Map.empty, 1, 1, dataChange = true) :: Nil, ManualUpdate) } // Make sure we didn't commit anything - val p = deltaFile(deltaLog.logPath, currentVersion + 2) + val p = unsafeDeltaFile(deltaLog.logPath, currentVersion + 2) assert( !p.getFileSystem(hadoopConf).exists(p), s"$p should not be committed") @@ -1230,7 +1230,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val log = DeltaLog.forTable(spark, path) log.ensureLogDirectoryExist() log.store.write( - deltaFile(log.logPath, 0), + unsafeDeltaFile(log.logPath, 0), Iterator(Metadata().json), overwrite = false, log.newDeltaHadoopConf()) @@ -2207,7 +2207,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest DeltaConfigs.IS_APPEND_ONLY.key -> "false", DeltaConfigs.CHANGE_DATA_FEED.key -> "true")) log.store.write( - deltaFile(log.logPath, snapshot.version + 1), + unsafeDeltaFile(log.logPath, snapshot.version + 1), Iterator(m.json, p.json), overwrite = false, log.newDeltaHadoopConf()) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala index 4684ee20685..b7821d25a92 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala @@ -107,7 +107,7 @@ class DeltaRetentionSuite extends QueryTest Nil } val version = txn.commit(delete ++ file, testOp) - val deltaFile = new File(FileNames.deltaFile(log.logPath, version).toUri) + val deltaFile = new File(FileNames.unsafeDeltaFile(log.logPath, version).toUri) deltaFile.setLastModified(clock.getTimeMillis() + i * 10000) val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri) crcFile.setLastModified(clock.getTimeMillis() + i * 10000) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index 44548178298..545fe4b1ab4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -1360,7 +1360,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase val deltaLog = DeltaLog.forTable(spark, tempDir) deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, deltaLog.snapshot.version + 1), + FileNames.unsafeDeltaFile(deltaLog.logPath, deltaLog.snapshot.version + 1), // Write a large reader version to fail the streaming query Iterator(Protocol(minReaderVersion = Int.MaxValue).json), overwrite = false, @@ -1385,7 +1385,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase val rangeStart = startVersion * 10 val rangeEnd = rangeStart + 10 spark.range(rangeStart, rangeEnd).write.format("delta").mode("append").save(location) - val file = new File(FileNames.deltaFile(deltaLog.logPath, startVersion).toUri) + val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, startVersion).toUri) file.setLastModified(ts) startVersion += 1 } @@ -1447,7 +1447,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase val deltaLog = DeltaLog.forTable(spark, tablePath) assert(deltaLog.update().version == 2) deltaLog.checkpoint() - new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri).delete() + new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri).delete() // Cannot start from version 0 assert(intercept[StreamingQueryException] { @@ -1522,7 +1522,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase // Add one commit and delete version 0 and version 1 generateCommits(inputDir.getCanonicalPath, start + 60.minutes) (0 to 1).foreach { v => - new File(FileNames.deltaFile(deltaLog.logPath, v).toUri).delete() + new File(FileNames.unsafeDeltaFile(deltaLog.logPath, v).toUri).delete() } // Although version 1 has been deleted, restarting the query should still work as we have @@ -1618,7 +1618,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase val deltaLog = DeltaLog.forTable(spark, tablePath) assert(deltaLog.update().version == 2) deltaLog.checkpoint() - new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri).delete() + new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri).delete() // Can start from version 1 even if it's not recreatable // TODO: currently we would error out if we couldn't construct the snapshot to check column @@ -1979,7 +1979,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase // Create a checkpoint so that we can create a snapshot without json files before version 3 srcLog.checkpoint() // Delete the first file - assert(new File(FileNames.deltaFile(srcLog.logPath, 1).toUri).delete()) + assert(new File(FileNames.unsafeDeltaFile(srcLog.logPath, 1).toUri).delete()) val e = intercept[StreamingQueryException] { val q = df.writeStream.format("delta") @@ -2014,7 +2014,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase // Create a checkpoint so that we can create a snapshot without json files before version 3 srcLog.checkpoint() // Delete the second file - assert(new File(FileNames.deltaFile(srcLog.logPath, 2).toUri).delete()) + assert(new File(FileNames.unsafeDeltaFile(srcLog.logPath, 2).toUri).delete()) val e = intercept[StreamingQueryException] { val q = df.writeStream.format("delta") @@ -2050,7 +2050,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase // Create a checkpoint so that we can create a snapshot without json files before version 3 srcLog.checkpoint() // Delete the first file - assert(new File(FileNames.deltaFile(srcLog.logPath, 1).toUri).delete()) + assert(new File(FileNames.unsafeDeltaFile(srcLog.logPath, 1).toUri).delete()) val q2 = df.writeStream.format("delta") .option("checkpointLocation", chkLocation.getCanonicalPath) @@ -2086,7 +2086,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase // Create a checkpoint so that we can create a snapshot without json files before version 3 srcLog.checkpoint() // Delete the second file - assert(new File(FileNames.deltaFile(srcLog.logPath, 2).toUri).delete()) + assert(new File(FileNames.unsafeDeltaFile(srcLog.logPath, 2).toUri).delete()) val q2 = df.writeStream.format("delta") .option("checkpointLocation", chkLocation.getCanonicalPath) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index 6ab969be508..695cac8ff33 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames} -import org.apache.spark.sql.delta.util.FileNames.deltaFile +import org.apache.spark.sql.delta.util.FileNames.unsafeDeltaFile import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path, PathHandle} import org.apache.spark.SparkException @@ -2037,13 +2037,13 @@ class DeltaSuite extends QueryTest // changes the schema val actions = Seq(Action.supportedProtocolVersion(), newMetadata) ++ files.map(_.remove) deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, snapshot.version + 1), + FileNames.unsafeDeltaFile(deltaLog.logPath, snapshot.version + 1), actions.map(_.json).iterator, overwrite = false, hadoopConf) deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, snapshot.version + 2), + FileNames.unsafeDeltaFile(deltaLog.logPath, snapshot.version + 2), files.take(1).map(_.json).iterator, overwrite = false, hadoopConf) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index 17067000616..04842fc09fa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils._ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.FileNames.deltaFile +import org.apache.spark.sql.delta.util.FileNames.unsafeDeltaFile import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest @@ -49,7 +49,7 @@ class DeltaTableFeatureSuite val log = DeltaLog.forTable(spark, path) log.ensureLogDirectoryExist() log.store.write( - deltaFile(log.logPath, 0), + unsafeDeltaFile(log.logPath, 0), Iterator(Metadata(schemaString = schema.json).json, protocol.json), overwrite = false, log.newDeltaHadoopConf()) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala index 00bb4aacdb5..f050be6d77e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala @@ -85,7 +85,7 @@ class DeltaTimeTravelSuite extends QueryTest val rangeStart = startVersion * 10 val rangeEnd = rangeStart + 10 spark.range(rangeStart, rangeEnd).write.format("delta").mode("append").save(location) - val file = new File(FileNames.deltaFile(deltaLog.logPath, startVersion).toUri) + val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, startVersion).toUri) file.setLastModified(ts) startVersion += 1 } @@ -162,7 +162,7 @@ class DeltaTimeTravelSuite extends QueryTest val commits2 = history.getHistory(Some(10)) assert(commits2.last.version === Some(0)) - assert(new File(FileNames.deltaFile(deltaLog.logPath, 0L).toUri).delete()) + assert(new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0L).toUri).delete()) val e = intercept[AnalysisException] { history.getActiveCommitAtTime(start + 15.seconds, false).version } @@ -542,7 +542,7 @@ class DeltaTimeTravelSuite extends QueryTest assert(e1.getMessage.contains("[0, 2]")) val deltaLog = DeltaLog.forTable(spark, tblLoc) - new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri).delete() + new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri).delete() val e2 = intercept[AnalysisException] { spark.read.format("delta").option("versionAsOf", 0).load(tblLoc).collect() } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala index c2fefb99460..a0f297d269c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala @@ -386,7 +386,7 @@ trait DescribeDeltaHistorySuiteBase val log = DeltaLog.forTable(spark, path) log.ensureLogDirectoryExist() log.store.write( - FileNames.deltaFile(log.logPath, 0), + FileNames.unsafeDeltaFile(log.logPath, 0), Iterator( Metadata(schemaString = spark.range(1).schema.asNullable.json).json, Protocol(1, 1).json), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/EvolvabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/EvolvabilitySuite.scala index e8cf5dd95a3..1445a9ef2f4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/EvolvabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/EvolvabilitySuite.scala @@ -66,7 +66,7 @@ class EvolvabilitySuite extends EvolvabilitySuiteBase with DeltaSQLCommandTest { // Check serialized JSON as well val contents = deltaLog.store.read( - FileNames.deltaFile(deltaLog.logPath, 0L), + FileNames.unsafeDeltaFile(deltaLog.logPath, 0L), deltaLog.newDeltaHadoopConf()) assert(contents.exists(_.contains(""""part":null""")), "null value should be written in json") } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/EvolvabilitySuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/EvolvabilitySuiteBase.scala index de32c568490..35e3f739408 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/EvolvabilitySuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/EvolvabilitySuiteBase.scala @@ -204,7 +204,7 @@ object EvolvabilitySuiteBase { val deltaLog = DeltaLog.forTable(spark, new Path(path)) val deltas = 0L to deltaLog.snapshot.version - val deltaFiles = deltas.map(deltaFile(deltaLog.logPath, _)).map(_.toString) + val deltaFiles = deltas.map(unsafeDeltaFile(deltaLog.logPath, _)).map(_.toString) val actionsTypesInLog = spark.read.schema(Action.logSchema).json(deltaFiles: _*) .as[SingleAction] @@ -293,14 +293,14 @@ object EvolvabilitySuiteBase { // manually remove AddFile in the previous commit and append a new column. val records = deltaLog.store.read( - FileNames.deltaFile(deltaLog.logPath, version), + FileNames.unsafeDeltaFile(deltaLog.logPath, version), deltaLog.newDeltaHadoopConf()) val actions = records.map(Action.fromJson).filter(action => action.isInstanceOf[AddFile]) .map { action => action.asInstanceOf[AddFile].remove} .toIterator val recordsWithNewAction = actions.map(_.json) ++ Iterator("""{"some_new_action":{"a":1}}""") deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, version + 1), + FileNames.unsafeDeltaFile(deltaLog.logPath, version + 1), recordsWithNewAction, overwrite = false, deltaLog.newDeltaHadoopConf()) @@ -318,7 +318,7 @@ object EvolvabilitySuiteBase { JsonUtils.toJson(newRecordMap + ("some_new_action_alongside_add_action" -> ("a" -> "1"))) }.toIterator deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, version + 2), + FileNames.unsafeDeltaFile(deltaLog.logPath, version + 2), newRecords, overwrite = false, deltaLog.newDeltaHadoopConf()) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index cb4d1bd15e8..fa2740bd9cf 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -152,10 +152,10 @@ class InCommitTimestampSuite deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) // Remove CommitInfo from the commit. val actions = deltaLog.store.readAsIterator( - FileNames.deltaFile(deltaLog.logPath, 1), deltaLog.newDeltaHadoopConf()) + FileNames.unsafeDeltaFile(deltaLog.logPath, 1), deltaLog.newDeltaHadoopConf()) val actionsWithoutCommitInfo = actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, 1), + FileNames.unsafeDeltaFile(deltaLog.logPath, 1), actionsWithoutCommitInfo, overwrite = true, deltaLog.newDeltaHadoopConf()) @@ -183,7 +183,7 @@ class InCommitTimestampSuite deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) // Remove CommitInfo.commitTimestamp from the commit. val actions = deltaLog.store.readAsIterator( - FileNames.deltaFile(deltaLog.logPath, 1), + FileNames.unsafeDeltaFile(deltaLog.logPath, 1), deltaLog.newDeltaHadoopConf()).toList val actionsWithoutCommitInfoCommitTimestamp = actions.map(Action.fromJson).map { @@ -193,7 +193,7 @@ class InCommitTimestampSuite other.json }.toIterator deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, 1), + FileNames.unsafeDeltaFile(deltaLog.logPath, 1), actionsWithoutCommitInfoCommitTimestamp, overwrite = true, deltaLog.newDeltaHadoopConf()) @@ -420,10 +420,10 @@ class InCommitTimestampSuite deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) // Remove CommitInfo from the commit. val actions = deltaLog.store.readAsIterator( - FileNames.deltaFile(deltaLog.logPath, 1), deltaLog.newDeltaHadoopConf()) + FileNames.unsafeDeltaFile(deltaLog.logPath, 1), deltaLog.newDeltaHadoopConf()) val actionsWithoutCommitInfo = actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) deltaLog.store.write( - FileNames.deltaFile(deltaLog.logPath, 1), + FileNames.unsafeDeltaFile(deltaLog.logPath, 1), actionsWithoutCommitInfo, overwrite = true, deltaLog.newDeltaHadoopConf()) @@ -603,7 +603,7 @@ class InCommitTimestampSuite val searchTimestamp = getInCommitTimestamp(deltaLog, enablementCommit.version + 1) // Delete the first two ICT commits before performing the search. (enablementCommit.version to enablementCommit.version + 1).foreach { version => - fs.delete(FileNames.deltaFile(deltaLog.logPath, version), false) + fs.delete(FileNames.unsafeDeltaFile(deltaLog.logPath, version), false) } val e = intercept[DeltaErrors.TimestampEarlierThanCommitRetentionException] { deltaLog.history.getActiveCommitAtTime( @@ -615,7 +615,7 @@ class InCommitTimestampSuite // Search for a non-ICT commit when all the non-ICT commits are missing. // Delete all the non-ICT commits. (0L until numNonICTCommits).foreach { version => - fs.delete(FileNames.deltaFile(deltaLog.logPath, version), false) + fs.delete(FileNames.unsafeDeltaFile(deltaLog.logPath, version), false) } intercept[DeltaErrors.TimestampEarlierThanCommitRetentionException] { deltaLog.history.getActiveCommitAtTime( @@ -667,7 +667,7 @@ class InCommitTimestampSuite history.reverse.zipWithIndex.foreach { case (hist, version) => assert(hist.getVersion == version) val expectedTimestamp = if (version < ictEnablementVersion) { - fs.getFileStatus(FileNames.deltaFile(deltaLog.logPath, version)) + fs.getFileStatus(FileNames.unsafeDeltaFile(deltaLog.logPath, version)) .getModificationTime } else { getInCommitTimestamp(deltaLog, version) @@ -680,8 +680,9 @@ class InCommitTimestampSuite assert(nonICTHistory.length == ictEnablementVersion) nonICTHistory.reverse.zipWithIndex.foreach { case (hist, version) => assert(hist.getVersion == version) - val expectedTimestamp = fs.getFileStatus(FileNames.deltaFile(deltaLog.logPath, version)) - .getModificationTime + val expectedTimestamp = + fs.getFileStatus(FileNames.unsafeDeltaFile(deltaLog.logPath, version)) + .getModificationTime assert(hist.timestamp.getTime == expectedTimestamp) } // Try fetching only the ICT commits. @@ -703,7 +704,8 @@ class InCommitTimestampSuite .foreach { case (hist, version) => assert(hist.getVersion == version) val expectedTimestamp = if (version < ictEnablementVersion) { - fs.getFileStatus(FileNames.deltaFile(deltaLog.logPath, version)).getModificationTime + fs.getFileStatus(FileNames.unsafeDeltaFile(deltaLog.logPath, version)) + .getModificationTime } else { getInCommitTimestamp(deltaLog, version) } @@ -802,8 +804,8 @@ class InCommitTimestampSuite // Delete the last few commits in the window. val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) - fs.delete(FileNames.deltaFile(deltaLog.logPath, 5), false) - fs.delete(FileNames.deltaFile(deltaLog.logPath, 6), false) + fs.delete(FileNames.unsafeDeltaFile(deltaLog.logPath, 5), false) + fs.delete(FileNames.unsafeDeltaFile(deltaLog.logPath, 6), false) // Search for the commit just before the deleted commits. commit = DeltaHistoryManager.getActiveCommitAtTimeFromICTRange( getICTCommit(4).timestamp, @@ -836,7 +838,7 @@ class InCommitTimestampSuite // Window -> [0, 11) // numChunks = 5, chunkSize = (11-0)/5 = 2 // chunks -> [0, 2), [2, 4), [4, 6), [6, 8), [8, 10), [10, 11) - fs.delete(FileNames.deltaFile(deltaLog.logPath, 4), false) + fs.delete(FileNames.unsafeDeltaFile(deltaLog.logPath, 4), false) // 4, 5, 6 have been deleted, so window [4, 6) is completely empty. // Search for the commit 6. @@ -867,8 +869,8 @@ class InCommitTimestampSuite assert(commit == getICTCommit(7)) // Scenario with many empty chunks. - fs.delete(FileNames.deltaFile(deltaLog.logPath, 8), false) - fs.delete(FileNames.deltaFile(deltaLog.logPath, 9), false) + fs.delete(FileNames.unsafeDeltaFile(deltaLog.logPath, 8), false) + fs.delete(FileNames.unsafeDeltaFile(deltaLog.logPath, 9), false) // Window -> [0, 11) // numChunks = 11, chunkSize = (11-0)/11 = 1 @@ -902,7 +904,7 @@ class InCommitTimestampSuite deltaCommitFileProvider).get assert(commit == getICTCommit(10)) - fs.delete(FileNames.deltaFile(deltaLog.logPath, 10), false) + fs.delete(FileNames.unsafeDeltaFile(deltaLog.logPath, 10), false) // Everything after and including `end` does not exist. commit = DeltaHistoryManager.getActiveCommitAtTimeFromICTRange( getICTCommit(7).timestamp, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index da8afa1178e..1e0db1c3d00 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -349,7 +349,7 @@ class OptimisticTransactionSuite // Validate that actions in both transactions are not exactly same. def readActions(version: Long): Seq[Action] = { - log.store.read(FileNames.deltaFile(log.logPath, version), log.newDeltaHadoopConf()) + log.store.read(FileNames.unsafeDeltaFile(log.logPath, version), log.newDeltaHadoopConf()) .map(Action.fromJson) } def removeTxnIdAndMetricsFromActions(actions: Seq[Action]): Seq[Action] = actions.map { @@ -379,7 +379,7 @@ class OptimisticTransactionSuite val version = txn.commit(Seq(), ManualUpdate) def readActions(version: Long): Seq[Action] = { - log.store.read(FileNames.deltaFile(log.logPath, version), log.newDeltaHadoopConf()) + log.store.read(FileNames.unsafeDeltaFile(log.logPath, version), log.newDeltaHadoopConf()) .map(Action.fromJson) } val actions = readActions(version) @@ -415,7 +415,7 @@ class OptimisticTransactionSuite txn.updateSetTransaction("TestAppId", 1L, None) val version = txn.commit(Seq(SetTransaction("TestAppId", 1L, None)), ManualUpdate) def readActions(version: Long): Seq[Action] = { - log.store.read(FileNames.deltaFile(log.logPath, version), log.newDeltaHadoopConf()) + log.store.read(FileNames.unsafeDeltaFile(log.logPath, version), log.newDeltaHadoopConf()) .map(Action.fromJson) } assert(readActions(version).collectFirst { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala index 54486b8e76d..f76df40109d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala @@ -121,7 +121,7 @@ trait RestoreTableSuiteBase extends QueryTest with SharedSparkSession deltaLog: DeltaLog, version: Int, timestamp: Long): Unit = { - val file = new File(FileNames.deltaFile(deltaLog.logPath, version).toUri) + val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) file.setLastModified(timestamp) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/S3SingleDriverLogStoreSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/S3SingleDriverLogStoreSuite.scala index cebb508ac79..7c869bab726 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/S3SingleDriverLogStoreSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/S3SingleDriverLogStoreSuite.scala @@ -46,7 +46,7 @@ trait S3SingleDriverLogStoreSuiteBase extends LogStoreSuiteBase { test("file system has priority over cache") { withTempDir { dir => val store = createLogStore(spark) - val deltas = Seq(0, 1, 2).map(i => FileNames.deltaFile(new Path(dir.toURI), i)) + val deltas = Seq(0, 1, 2).map(i => FileNames.unsafeDeltaFile(new Path(dir.toURI), i)) store.write(deltas(0), Iterator("zero"), overwrite = false, sessionHadoopConf) store.write(deltas(1), Iterator("one"), overwrite = false, sessionHadoopConf) store.write(deltas(2), Iterator("two"), overwrite = false, sessionHadoopConf) @@ -70,7 +70,7 @@ trait S3SingleDriverLogStoreSuiteBase extends LogStoreSuiteBase { withTempDir { dir => val store = createLogStore(spark) val deltas = - Seq(0, 1, 2, 3, 4).map(i => FileNames.deltaFile(new Path(dir.toURI), i)) + Seq(0, 1, 2, 3, 4).map(i => FileNames.unsafeDeltaFile(new Path(dir.toURI), i)) store.write(deltas(0), Iterator("zero"), overwrite = false, sessionHadoopConf) store.write(deltas(1), Iterator("one"), overwrite = false, sessionHadoopConf) store.write(deltas(2), Iterator("two"), overwrite = false, sessionHadoopConf) @@ -119,7 +119,7 @@ trait S3SingleDriverLogStoreSuiteBase extends LogStoreSuiteBase { dir.mkdir() val store = createLogStore(spark) val deltas = - Seq(0, 1, 2).map(i => FileNames.deltaFile(new Path(dir.toURI), i)) + Seq(0, 1, 2).map(i => FileNames.unsafeDeltaFile(new Path(dir.toURI), i)) store.write(deltas(0), Iterator("log version 0"), overwrite = false, sessionHadoopConf) store.write(deltas(1), Iterator("log version 1"), overwrite = false, sessionHadoopConf) store.write(deltas(2), Iterator("log version 2"), overwrite = false, sessionHadoopConf) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index 93db97497cc..8097c836374 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -78,7 +78,8 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar } private def deleteLogVersion(path: String, version: Long): Unit = { - val deltaFile = new File(FileNames.deltaFile(new Path(path, "_delta_log"), version).toString) + val deltaFile = new File( + FileNames.unsafeDeltaFile(new Path(path, "_delta_log"), version).toString) assert(deltaFile.exists(), s"Could not find $deltaFile") assert(deltaFile.delete(), s"Failed to delete $deltaFile") } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStoreSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStoreSuite.scala index 18f13bf1c1d..9466aed73f1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStoreSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStoreSuite.scala @@ -72,7 +72,7 @@ class InMemoryCommitStoreSuite extends QueryTest version: Long, logPath: Path, timestampOpt: Option[Long] = None): Unit = { - val delta = FileNames.deltaFile(logPath, version) + val delta = FileNames.unsafeDeltaFile(logPath, version) if (timestampOpt.isDefined) { assert(store.read(delta, sessionHadoopConf) == Seq(s"$version", s"${timestampOpt.get}")) } else { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala index 65134a41b0c..9c76b2ed075 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala @@ -271,16 +271,22 @@ class ManagedCommitSuite val newMetadata2 = oldMetadata.copy( configuration = oldMetadataConf + (MANAGED_COMMIT_OWNER_NAME.key -> builder2.name)) log.store.write( - FileNames.deltaFile(log.logPath, 2), Seq(newMetadata1.json).toIterator, false, conf) + FileNames.unsafeDeltaFile(log.logPath, 2), + Seq(newMetadata1.json).toIterator, + overwrite = false, + conf) log.store.write( - FileNames.deltaFile(log.logPath, 3), Seq(newMetadata2.json).toIterator, false, conf) + FileNames.unsafeDeltaFile(log.logPath, 3), + Seq(newMetadata2.json).toIterator, + overwrite = false, + conf) cs2.registerTable(log.logPath, maxCommitVersion = 3) // Also backfill commit 0, 1 -- which the spec mandates when the commit owner changes. // commit 0 should already be backfilled assert(segment.deltas(0).getPath.getName === "00000000000000000000.json") log.store.write( - path = FileNames.deltaFile(log.logPath, 1), + path = FileNames.unsafeDeltaFile(log.logPath, 1), actions = log.store.read(segment.deltas(1).getPath, conf).toIterator, overwrite = true, conf) @@ -415,7 +421,7 @@ class ManagedCommitSuite // backfill commit 1 and 2 also as 3/4 are written directly to FS. val segment = log.unsafeVolatileSnapshot.logSegment log.store.write( - path = FileNames.deltaFile(log.logPath, v), + path = FileNames.unsafeDeltaFile(log.logPath, v), actions = log.store.read(segment.deltas(v).getPath).toIterator, overwrite = true) } @@ -525,7 +531,8 @@ class ManagedCommitSuite snapshot.ensureCommitFilesBackfilled() val commitFiles = log.listFrom(0).filter(FileNames.isDeltaFile).map(_.getPath) - val backfilledCommitFiles = (0 to 9).map(version => FileNames.deltaFile(log.logPath, version)) + val backfilledCommitFiles = (0 to 9).map( + version => FileNames.unsafeDeltaFile(log.logPath, version)) assert(commitFiles.toSeq == backfilledCommitFiles) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdSuite.scala index eca8876e0dc..92ca6c1d2a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdSuite.scala @@ -118,7 +118,7 @@ class RowIdSuite extends QueryTest // Delete the first commit and all checksum files to force the next read to read the high // watermark from the checkpoint. val fs = log1.logPath.getFileSystem(log1.newDeltaHadoopConf()) - fs.delete(FileNames.deltaFile(log1.logPath, version = 0), true) + fs.delete(FileNames.unsafeDeltaFile(log1.logPath, version = 0), true) fs.delete(FileNames.checksumFile(log1.logPath, version = 0), true) fs.delete(FileNames.checksumFile(log1.logPath, version = 1), true) diff --git a/storage-s3-dynamodb/src/test/scala/io/delta/storage/ExternalLogStoreSuite.scala b/storage-s3-dynamodb/src/test/scala/io/delta/storage/ExternalLogStoreSuite.scala index 945618f355b..a745cfc59e2 100644 --- a/storage-s3-dynamodb/src/test/scala/io/delta/storage/ExternalLogStoreSuite.scala +++ b/storage-s3-dynamodb/src/test/scala/io/delta/storage/ExternalLogStoreSuite.scala @@ -41,11 +41,11 @@ class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSui ) def getDeltaVersionPath(logDir: File, version: Int): Path = { - FileNames.deltaFile(new Path(logDir.toURI), version) + FileNames.unsafeDeltaFile(new Path(logDir.toURI), version) } def getFailingDeltaVersionPath(logDir: File, version: Int): Path = { - FileNames.deltaFile(new Path(s"failing:${logDir.getCanonicalPath}"), version) + FileNames.unsafeDeltaFile(new Path(s"failing:${logDir.getCanonicalPath}"), version) } test("single write") {