diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index cd0a58a5362..e1d91af67bb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1337,10 +1337,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite try { val tags = Map.empty[String, String] + val commitTimestampMs = clock.getTimeMillis() val commitInfo = CommitInfo( - NANOSECONDS.toMillis(commitStartNano), + commitTimestampMs, operation = op.name, - generateInCommitTimestampForFirstCommitAttempt(NANOSECONDS.toMillis(commitStartNano)), + generateInCommitTimestampForFirstCommitAttempt(commitTimestampMs), operationParameters = op.jsonEncodedValues, context, readVersion = Some(readVersion), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index 233439d6add..7c293eeda85 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -143,6 +143,44 @@ class InCommitTimestampSuite } } + for (useCommitLarge <- BOOLEAN_DOMAIN) + test("txn.commit should use clock.currentTimeMillis() for ICT" + + s" [useCommitLarge: $useCommitLarge]") { + withTempDir { tempDir => + spark.range(2).write.format("delta").save(tempDir.getAbsolutePath) + // Clear the DeltaLog cache so that a new DeltaLog is created with the manual clock. + DeltaLog.clearCache() + val expectedCommit1Time = System.currentTimeMillis() + val clock = new ManualClock(expectedCommit1Time) + val deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val ver0Snapshot = deltaLog.snapshot + assert(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(ver0Snapshot.metadata)) + val usageRecords = Log4jUsageLogger.track { + if (useCommitLarge) { + deltaLog.startTransaction().commitLarge( + spark, + Seq(createTestAddFile("1")).toIterator, + newProtocolOpt = None, + DeltaOperations.ManualUpdate, + context = Map.empty, + metrics = Map.empty) + } else { + deltaLog.startTransaction().commit( + Seq(createTestAddFile("1")), + DeltaOperations.ManualUpdate, + tags = Map.empty + ) + } + } + val ver1Snapshot = deltaLog.snapshot + val retrievedTimestamp = getInCommitTimestamp(deltaLog, version = 1) + assert(ver1Snapshot.timestamp == retrievedTimestamp) + assert(ver1Snapshot.timestamp == expectedCommit1Time) + val expectedOpType = if (useCommitLarge) "delta.commit.large" else "delta.commit" + assert(filterUsageRecords(usageRecords, expectedOpType).length == 1) + } + } + test("Missing CommitInfo should result in a DELTA_MISSING_COMMIT_INFO exception") { withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)