Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 29, 2024
1 parent 5ae57cc commit e3169d7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,20 @@ class DeltaHistoryManager(
import org.apache.spark.sql.delta.implicits._
val conf = getSerializableHadoopConf
val logPath = deltaLog.logPath.toString
val snapshot = endOpt.map(end => deltaLog.getSnapshotAt(end)).getOrElse(deltaLog.update())
val commitFileProvider = DeltaCommitFileProvider(snapshot)
val currentSnapshot = deltaLog.unsafeVolatileSnapshot
val (snapshotForCommitFileProvider, end) = endOpt match {
case Some(end) if currentSnapshot.version >= end =>
// Use the cache snapshot if it's fresh enough for the [start, end] query.
(currentSnapshot, end)
case _ =>
// Either end doesn't exist or the currently cached snapshot isn't new enough to satisfy it.
val newSnapshot = deltaLog.update()
val endVersion = endOpt.getOrElse(newSnapshot.version).min(newSnapshot.version)
(newSnapshot, endVersion)
}
val commitFileProvider = DeltaCommitFileProvider(snapshotForCommitFileProvider)
// We assume that commits are contiguous, therefore we try to load all of them in order
val info = spark.range(start, snapshot.version + 1)
val info = spark.range(start, end + 1)
.mapPartitions { versions =>
val logStore = LogStore(SparkEnv.get.conf, conf.value)
val basePath = new Path(logPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import java.util.{Date, Locale}
import scala.concurrent.duration._
import scala.language.implicitConversions

import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.DeltaTestUtils.filterUsageRecords
import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.StatsUtils
Expand Down Expand Up @@ -613,17 +615,25 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests
val start = 1540415658000L
generateCommits(tblName, start, start + 20.minutes, start + 40.minutes, start + 60.minutes)
val deltaLog = DeltaLog.forTable(spark, getTableLocation(tblName))
val history_02 = deltaLog.history.getHistory(start = 0, endOpt = Some(2))
assert(history_02.size == 3)
assert(history_02.map(_.getVersion) == Seq(2, 1, 0))

val history_13 = deltaLog.history.getHistory(start = 1, endOpt = Some(1))
assert(history_13.size == 1)
assert(history_13.map(_.getVersion) == Seq(1))
def testGetHistory(
start: Long,
endOpt: Option[Long],
versions: Seq[Long],
expectedLogUpdates: Int): Unit = {
val usageRecords = Log4jUsageLogger.track {
val history = deltaLog.history.getHistory(start, endOpt)
assert(history.map(_.getVersion) == versions)
}
assert(filterUsageRecords(usageRecords, "deltaLog.update").size === expectedLogUpdates)
}

val history_2 = deltaLog.history.getHistory(start = 2, endOpt = None)
assert(history_2.size == 2)
assert(history_2.map(_.getVersion) == Seq(3, 2))
testGetHistory(start = 0, endOpt = Some(2), versions = Seq(2, 1, 0), expectedLogUpdates = 0)
testGetHistory(start = 1, endOpt = Some(1), versions = Seq(1), expectedLogUpdates = 0)
testGetHistory(start = 2, endOpt = None, versions = Seq(3, 2), expectedLogUpdates = 1)
testGetHistory(start = 1, endOpt = Some(5), versions = Seq(3, 2, 1), expectedLogUpdates = 1)
testGetHistory(start = 4, endOpt = None, versions = Seq.empty, expectedLogUpdates = 1)
testGetHistory(start = 2, endOpt = Some(1), versions = Seq.empty, expectedLogUpdates = 0)
}
}
}
Expand Down

0 comments on commit e3169d7

Please sign in to comment.