Skip to content

Commit

Permalink
[Spark] Minor refactor to ManagedCommitBaseSuite utility - add commen…
Browse files Browse the repository at this point in the history
…ts (delta-io#2821)

## Description

Minor refactor to ManagedCommitBaseSuite utility - add comments.

## How was this patch tested?

Existing UTs

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
prakharjain09 authored Mar 29, 2024
1 parent 0983543 commit 74ee876
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,5 +636,5 @@ class DeltaHistoryManagerSuite extends DeltaHistoryManagerBase {
}

class ManagedCommitFill1DeltaHistoryManagerSuite extends DeltaHistoryManagerSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
override def managedCommitBackfillBatchSize: Option[Int] = Some(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -765,5 +765,5 @@ class DeltaTimeTravelSuite extends QueryTest
}

class ManagedCommitFill1DeltaTimeTravelSuite extends DeltaTimeTravelSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
override def managedCommitBackfillBatchSize: Option[Int] = Some(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.test.SharedSparkSession
trait ManagedCommitTestUtils
extends DeltaTestUtilsBase { self: SparkFunSuite with SharedSparkSession =>

/** Run the test with different backfill batch sizes: 1, 2, 10 */
def testWithDifferentBackfillInterval(testName: String)(f: Int => Unit): Unit = {
Seq(1, 2, 10).foreach { backfillBatchSize =>
test(s"$testName [Backfill batch size: $backfillBatchSize]") {
Expand All @@ -39,8 +40,11 @@ trait ManagedCommitTestUtils
}
}

def testWithDifferentBackfillIntervalOptional(
testName: String)(f: Option[Int] => Unit): Unit = {
/** Run the test with:
* 1. Without managed-commits
* 2. With managed-commits with different backfill batch sizes
*/
def testWithDifferentBackfillIntervalOptional(testName: String)(f: Option[Int] => Unit): Unit = {
test(s"$testName [Backfill batch size: None]") {
f(None)
}
Expand Down Expand Up @@ -129,28 +133,34 @@ class TrackingCommitStore(delegatingCommitStore: InMemoryCommitStore) extends Co
}
}

/**
* A helper class which enables managed-commit for the test suite based on the given
* `managedCommitBackfillBatchSize` conf.
*/
trait ManagedCommitBaseSuite extends SparkFunSuite with SharedSparkSession {
val managedCommitBackfillBatchSize: Option[Int] = None

// If this config is not overridden, managed commits are disabled.
def managedCommitBackfillBatchSize: Option[Int] = None

final def managedCommitsEnabledInTests: Boolean = managedCommitBackfillBatchSize.nonEmpty

override protected def sparkConf: SparkConf = {
var sparkConf = super.sparkConf
if (managedCommitBackfillBatchSize.nonEmpty) {
val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue")
val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf)
sparkConf = sparkConf
.set(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey, "in-memory")
.set(
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey,
managedCommitOwnerJson)
super.sparkConf
.set(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey, "tracking-in-memory")
.set(DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey, managedCommitOwnerJson)
} else {
super.sparkConf
}
sparkConf
}

override def beforeEach(): Unit = {
super.beforeEach()
CommitStoreProvider.clearNonDefaultBuilders()
managedCommitBackfillBatchSize.foreach { batchSize =>
CommitStoreProvider.registerBuilder(InMemoryCommitStoreBuilder(batchSize))
CommitStoreProvider.registerBuilder(TrackingInMemoryCommitStoreBuilder(batchSize))
}
}
}

0 comments on commit 74ee876

Please sign in to comment.