From 0364f9dd718d17e091e4b4f2b6a303c00a08b1c0 Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Fri, 29 Mar 2024 12:39:10 -0700 Subject: [PATCH] Fix bug --- .../spark/sql/delta/DeltaHistoryManager.scala | 17 ++++++++++++++--- .../sql/delta/DeltaHistoryManagerSuite.scala | 16 +++++++++++++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala index ca730c9a279..55832b85ee2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala @@ -77,10 +77,21 @@ class DeltaHistoryManager( import org.apache.spark.sql.delta.implicits._ val conf = getSerializableHadoopConf val logPath = deltaLog.logPath.toString - val snapshot = endOpt.map(end => deltaLog.getSnapshotAt(end)).getOrElse(deltaLog.update()) - val commitFileProvider = DeltaCommitFileProvider(snapshot) + val usableSnapshot = endOpt match { + case Some(end) if deltaLog.unsafeVolatileSnapshot.version >= end => + // Used the cache snapshot if it's fresh enough for the [start, end] query. + deltaLog.unsafeVolatileSnapshot + case _ => + deltaLog.update() + } + + val end = endOpt match { + case Some(value) => usableSnapshot.version.min(value) + case None => usableSnapshot.version + } + val commitFileProvider = DeltaCommitFileProvider(usableSnapshot) // We assume that commits are contiguous, therefore we try to load all of them in order - val info = spark.range(start, snapshot.version + 1) + val info = spark.range(start, end + 1) .mapPartitions { versions => val logStore = LogStore(SparkEnv.get.conf, conf.value) val basePath = new Path(logPath) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala index a6a52af6d2b..be26fcb4df4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala @@ -617,13 +617,23 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests assert(history_02.size == 3) assert(history_02.map(_.getVersion) == Seq(2, 1, 0)) - val history_13 = deltaLog.history.getHistory(start = 1, endOpt = Some(1)) - assert(history_13.size == 1) - assert(history_13.map(_.getVersion) == Seq(1)) + val history_11 = deltaLog.history.getHistory(start = 1, endOpt = Some(1)) + assert(history_11.size == 1) + assert(history_11.map(_.getVersion) == Seq(1)) val history_2 = deltaLog.history.getHistory(start = 2, endOpt = None) assert(history_2.size == 2) assert(history_2.map(_.getVersion) == Seq(3, 2)) + + val history_15 = deltaLog.history.getHistory(start = 1, endOpt = Some(5)) + assert(history_15.size == 3) + assert(history_15.map(_.getVersion) == Seq(3, 2, 1)) + + val history_4 = deltaLog.history.getHistory(start = 4, endOpt = None) + assert(history_4.isEmpty) + + val history_21 = deltaLog.history.getHistory(start = 2, endOpt = Some(1)) + assert(history_21.isEmpty) } } }