Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 29, 2024
1 parent 74ee876 commit 0364f9d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,21 @@ class DeltaHistoryManager(
import org.apache.spark.sql.delta.implicits._
val conf = getSerializableHadoopConf
val logPath = deltaLog.logPath.toString
val snapshot = endOpt.map(end => deltaLog.getSnapshotAt(end)).getOrElse(deltaLog.update())
val commitFileProvider = DeltaCommitFileProvider(snapshot)
val usableSnapshot = endOpt match {
case Some(end) if deltaLog.unsafeVolatileSnapshot.version >= end =>
// Used the cache snapshot if it's fresh enough for the [start, end] query.
deltaLog.unsafeVolatileSnapshot
case _ =>
deltaLog.update()
}

val end = endOpt match {
case Some(value) => usableSnapshot.version.min(value)
case None => usableSnapshot.version
}
val commitFileProvider = DeltaCommitFileProvider(usableSnapshot)
// We assume that commits are contiguous, therefore we try to load all of them in order
val info = spark.range(start, snapshot.version + 1)
val info = spark.range(start, end + 1)
.mapPartitions { versions =>
val logStore = LogStore(SparkEnv.get.conf, conf.value)
val basePath = new Path(logPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,13 +617,23 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests
assert(history_02.size == 3)
assert(history_02.map(_.getVersion) == Seq(2, 1, 0))

val history_13 = deltaLog.history.getHistory(start = 1, endOpt = Some(1))
assert(history_13.size == 1)
assert(history_13.map(_.getVersion) == Seq(1))
val history_11 = deltaLog.history.getHistory(start = 1, endOpt = Some(1))
assert(history_11.size == 1)
assert(history_11.map(_.getVersion) == Seq(1))

val history_2 = deltaLog.history.getHistory(start = 2, endOpt = None)
assert(history_2.size == 2)
assert(history_2.map(_.getVersion) == Seq(3, 2))

val history_15 = deltaLog.history.getHistory(start = 1, endOpt = Some(5))
assert(history_15.size == 3)
assert(history_15.map(_.getVersion) == Seq(3, 2, 1))

val history_4 = deltaLog.history.getHistory(start = 4, endOpt = None)
assert(history_4.isEmpty)

val history_21 = deltaLog.history.getHistory(start = 2, endOpt = Some(1))
assert(history_21.isEmpty)
}
}
}
Expand Down

0 comments on commit 0364f9d

Please sign in to comment.