From 74ee8763ca2449505bc4addefdcb4ce34fb5f1c6 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Fri, 29 Mar 2024 11:50:28 -0700 Subject: [PATCH] [Spark] Minor refactor to ManagedCommitBaseSuite utility - add comments (#2821) ## Description Minor refactor to ManagedCommitBaseSuite utility - add comments. ## How was this patch tested? Existing UTs ## Does this PR introduce _any_ user-facing changes? No --- .../sql/delta/DeltaHistoryManagerSuite.scala | 2 +- .../sql/delta/DeltaTimeTravelSuite.scala | 2 +- .../ManagedCommitTestUtils.scala | 32 ++++++++++++------- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala index 2730502dcd9..a6a52af6d2b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala @@ -636,5 +636,5 @@ class DeltaHistoryManagerSuite extends DeltaHistoryManagerBase { } class ManagedCommitFill1DeltaHistoryManagerSuite extends DeltaHistoryManagerSuite { - override val managedCommitBackfillBatchSize: Option[Int] = Some(1) + override def managedCommitBackfillBatchSize: Option[Int] = Some(1) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala index e6076f1a22c..00bb4aacdb5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala @@ -765,5 +765,5 @@ class DeltaTimeTravelSuite extends QueryTest } class ManagedCommitFill1DeltaTimeTravelSuite extends DeltaTimeTravelSuite { - override val managedCommitBackfillBatchSize: Option[Int] = Some(1) + override def managedCommitBackfillBatchSize: Option[Int] = Some(1) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala index 58f690e905a..b99d2cdc3ca 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.test.SharedSparkSession trait ManagedCommitTestUtils extends DeltaTestUtilsBase { self: SparkFunSuite with SharedSparkSession => + /** Run the test with different backfill batch sizes: 1, 2, 10 */ def testWithDifferentBackfillInterval(testName: String)(f: Int => Unit): Unit = { Seq(1, 2, 10).foreach { backfillBatchSize => test(s"$testName [Backfill batch size: $backfillBatchSize]") { @@ -39,8 +40,11 @@ trait ManagedCommitTestUtils } } - def testWithDifferentBackfillIntervalOptional( - testName: String)(f: Option[Int] => Unit): Unit = { + /** Run the test with: + * 1. Without managed-commits + * 2. With managed-commits with different backfill batch sizes + */ + def testWithDifferentBackfillIntervalOptional(testName: String)(f: Option[Int] => Unit): Unit = { test(s"$testName [Backfill batch size: None]") { f(None) } @@ -129,28 +133,34 @@ class TrackingCommitStore(delegatingCommitStore: InMemoryCommitStore) extends Co } } +/** + * A helper class which enables managed-commit for the test suite based on the given + * `managedCommitBackfillBatchSize` conf. + */ trait ManagedCommitBaseSuite extends SparkFunSuite with SharedSparkSession { - val managedCommitBackfillBatchSize: Option[Int] = None + + // If this config is not overridden, managed commits are disabled. + def managedCommitBackfillBatchSize: Option[Int] = None + + final def managedCommitsEnabledInTests: Boolean = managedCommitBackfillBatchSize.nonEmpty override protected def sparkConf: SparkConf = { - var sparkConf = super.sparkConf if (managedCommitBackfillBatchSize.nonEmpty) { val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue") val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf) - sparkConf = sparkConf - .set(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey, "in-memory") - .set( - DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey, - managedCommitOwnerJson) + super.sparkConf + .set(DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey, "tracking-in-memory") + .set(DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey, managedCommitOwnerJson) + } else { + super.sparkConf } - sparkConf } override def beforeEach(): Unit = { super.beforeEach() CommitStoreProvider.clearNonDefaultBuilders() managedCommitBackfillBatchSize.foreach { batchSize => - CommitStoreProvider.registerBuilder(InMemoryCommitStoreBuilder(batchSize)) + CommitStoreProvider.registerBuilder(TrackingInMemoryCommitStoreBuilder(batchSize)) } } }