diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala index 2730502dcd9..a6a52af6d2b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala @@ -636,5 +636,5 @@ class DeltaHistoryManagerSuite extends DeltaHistoryManagerBase { } class ManagedCommitFill1DeltaHistoryManagerSuite extends DeltaHistoryManagerSuite { - override val managedCommitBackfillBatchSize: Option[Int] = Some(1) + override def managedCommitBackfillBatchSize: Option[Int] = Some(1) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala index e6076f1a22c..00bb4aacdb5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala @@ -765,5 +765,5 @@ class DeltaTimeTravelSuite extends QueryTest } class ManagedCommitFill1DeltaTimeTravelSuite extends DeltaTimeTravelSuite { - override val managedCommitBackfillBatchSize: Option[Int] = Some(1) + override def managedCommitBackfillBatchSize: Option[Int] = Some(1) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala index 58f690e905a..b99d2cdc3ca 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.test.SharedSparkSession trait ManagedCommitTestUtils extends DeltaTestUtilsBase { self: SparkFunSuite with SharedSparkSession => + /** Run the test with different backfill batch sizes: 1, 2, 10 */ def testWithDifferentBackfillInterval(testName: String)(f: Int => Unit): Unit = { Seq(1, 2, 10).foreach { backfillBatchSize => test(s"$testName [Backfill batch size: $backfillBatchSize]") { @@ -39,8 +40,11 @@ trait ManagedCommitTestUtils } } - def testWithDifferentBackfillIntervalOptional( - testName: String)(f: Option[Int] => Unit): Unit = { + /** Run the test with: + * 1. Without managed-commits + * 2. With managed-commits with different backfill batch sizes + */ + def testWithDifferentBackfillIntervalOptional(testName: String)(f: Option[Int] => Unit): Unit = { test(s"$testName [Backfill batch size: None]") { f(None) } @@ -129,28 +133,34 @@ class TrackingCommitStore(delegatingCommitStore: InMemoryCommitStore) extends Co } } +/** + * A helper class which enables managed-commit for the test suite based on the given + * `managedCommitBackfillBatchSize` conf. + */ trait ManagedCommitBaseSuite extends SparkFunSuite with SharedSparkSession { - val managedCommitBackfillBatchSize: Option[Int] = None + + // If this config is not overridden, managed commits are disabled. + def managedCommitBackfillBatchSize: Option[Int] = None + + final def managedCommitsEnabledInTests: Boolean = managedCommitBackfillBatchSize.nonEmpty override protected def sparkConf: SparkConf = { - var sparkConf = super.sparkConf if (managedCommitBackfillBatchSize.nonEmpty) { val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue") val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf) - sparkConf = sparkConf - .set(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey, "in-memory") - .set( - DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey, - managedCommitOwnerJson) + super.sparkConf + .set(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey, "tracking-in-memory") + .set(DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey, managedCommitOwnerJson) + } else { + super.sparkConf } - sparkConf } override def beforeEach(): Unit = { super.beforeEach() CommitStoreProvider.clearNonDefaultBuilders() managedCommitBackfillBatchSize.foreach { batchSize => - CommitStoreProvider.registerBuilder(InMemoryCommitStoreBuilder(batchSize)) + CommitStoreProvider.registerBuilder(TrackingInMemoryCommitStoreBuilder(batchSize)) } } }