From eca5a7f439f8039207705ee03b66916c6a987b79 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Fri, 17 May 2024 10:51:49 -0700 Subject: [PATCH] [Spark] InCommitTimestamp: Use clock.currentTimeMillis() instead of nanoTime() in commitLarge (#3111) ## Description We currently use NANOSECONDS.toMillis(System.nanoTime()) for generating the ICT when `commitLarge` is called. However, this usage of System.nanoTime() is not correct as it should only be used for measuring time difference, not to get an approximate wall clock time. This leads to scenarios where the ICT becomes very small (e.g. 1 Jan 1970) sometimes because some systems return a very small number when System.nanoTime() is called. This PR changes this so that clock.getCurrentTimeMillis() is used instead. ## How was this patch tested? Added a test case to ensure that `clock.getCurrentTimeMillis()` is being used. --- .../sql/delta/OptimisticTransaction.scala | 5 ++- .../sql/delta/InCommitTimestampSuite.scala | 38 +++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index cd0a58a5362..e1d91af67bb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1337,10 +1337,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite try { val tags = Map.empty[String, String] + val commitTimestampMs = clock.getTimeMillis() val commitInfo = CommitInfo( - NANOSECONDS.toMillis(commitStartNano), + commitTimestampMs, operation = op.name, - generateInCommitTimestampForFirstCommitAttempt(NANOSECONDS.toMillis(commitStartNano)), + generateInCommitTimestampForFirstCommitAttempt(commitTimestampMs), operationParameters = op.jsonEncodedValues, context, readVersion = Some(readVersion), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index 233439d6add..7c293eeda85 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -143,6 +143,44 @@ class InCommitTimestampSuite } } + for (useCommitLarge <- BOOLEAN_DOMAIN) + test("txn.commit should use clock.currentTimeMillis() for ICT" + + s" [useCommitLarge: $useCommitLarge]") { + withTempDir { tempDir => + spark.range(2).write.format("delta").save(tempDir.getAbsolutePath) + // Clear the DeltaLog cache so that a new DeltaLog is created with the manual clock. + DeltaLog.clearCache() + val expectedCommit1Time = System.currentTimeMillis() + val clock = new ManualClock(expectedCommit1Time) + val deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val ver0Snapshot = deltaLog.snapshot + assert(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(ver0Snapshot.metadata)) + val usageRecords = Log4jUsageLogger.track { + if (useCommitLarge) { + deltaLog.startTransaction().commitLarge( + spark, + Seq(createTestAddFile("1")).toIterator, + newProtocolOpt = None, + DeltaOperations.ManualUpdate, + context = Map.empty, + metrics = Map.empty) + } else { + deltaLog.startTransaction().commit( + Seq(createTestAddFile("1")), + DeltaOperations.ManualUpdate, + tags = Map.empty + ) + } + } + val ver1Snapshot = deltaLog.snapshot + val retrievedTimestamp = getInCommitTimestamp(deltaLog, version = 1) + assert(ver1Snapshot.timestamp == retrievedTimestamp) + assert(ver1Snapshot.timestamp == expectedCommit1Time) + val expectedOpType = if (useCommitLarge) "delta.commit.large" else "delta.commit" + assert(filterUsageRecords(usageRecords, expectedOpType).length == 1) + } + } + test("Missing CommitInfo should result in a DELTA_MISSING_COMMIT_INFO exception") { withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)