diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala index b82d100cdb7..c696097c75f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala @@ -233,11 +233,7 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest ) txn.commit(newMetadata :: Nil, DeltaOperations.ManualUpdate) val coordinatedCommitsProperties = batchSizeOpt.map(_ => - Map(DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> "tracking-in-memory", - DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key -> - "{\"randomConf\":\"randomConfValue\"}", - DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF.key -> "{}", - DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key -> "true")) + getCoordinatedCommitsDefaultProperties(withICT = true)) .getOrElse(Map.empty) checkResult(sql(s"DESCRIBE DETAIL $tableName"), Seq(Map("foo" -> "bar") ++ coordinatedCommitsProperties), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala index a5e4ef662b3..917a04ef5f7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala @@ -21,10 +21,11 @@ import java.io.File import org.apache.spark.sql.delta.DescribeDeltaHistorySuiteShims._ import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, Metadata, Protocol, RemoveFile} +import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} import org.scalactic.source.Position import org.scalatest.Tag @@ -42,7 +43,8 @@ trait DescribeDeltaHistorySuiteBase with SharedSparkSession with DeltaSQLCommandTest with DeltaTestUtilsForTempViews - with MergeIntoMetricsBase { + with MergeIntoMetricsBase + with CoordinatedCommitsBaseSuite { import testImplicits._ @@ -139,6 +141,21 @@ trait DescribeDeltaHistorySuiteBase .asInstanceOf[Map[String, String]] } + // Returns necessary delta property json expected for the test. If coordinated commit is enabled, + // a few properties will be automatically populated, and this method will take care of it. + protected def getPropertiesJson(extraProperty: Option[Map[String, String]] = None): String = { + val coordinatedCommitsProperty = if (coordinatedCommitsEnabledInTests) { + getCoordinatedCommitsDefaultProperties() + } else { + Map.empty[String, String] + } + // For history command, the output omits the empty config value, so we also need to + // manually omit the value here. + val properties = coordinatedCommitsProperty.filterNot { case (_, value) => value == "{}" } + val finalProperties = extraProperty.map(properties ++ _).getOrElse(properties) + JsonUtils.toJson(finalProperties) + } + testWithFlag("basic case - Scala history with path-based table") { val tempDir = Utils.createTempDir().toString Seq(1, 2, 3).toDF().write.format("delta").save(tempDir) @@ -256,6 +273,7 @@ trait DescribeDeltaHistorySuiteBase |comment 'this is my table' |tblproperties (delta.appendOnly=true) """.stripMargin) + val appendOnlyTableProperty = Map("delta.appendOnly" -> "true") checkLastOperation( spark.sessionState.catalog.getTableMetadata(TableIdentifier("delta_test")).location.getPath, Seq( @@ -263,7 +281,7 @@ trait DescribeDeltaHistorySuiteBase "true", """["b"]""", """[]""", - """{"delta.appendOnly":"true"}""", + getPropertiesJson(Some(appendOnlyTableProperty)), "this is my table"), Seq( $"operation", $"operationParameters.isManaged", $"operationParameters.partitionBy", @@ -279,7 +297,7 @@ trait DescribeDeltaHistorySuiteBase .option("path", tempDir).saveAsTable("delta_test") checkLastOperation( tempDir, - Seq("CREATE TABLE AS SELECT", "false", """[]""", """[]""", "{}", null), + Seq("CREATE TABLE AS SELECT", "false", """[]""", """[]""", getPropertiesJson(), null), Seq($"operation", $"operationParameters.isManaged", $"operationParameters.partitionBy", $"operationParameters.clusterBy", $"operationParameters.properties", $"operationParameters.description")) @@ -297,13 +315,15 @@ trait DescribeDeltaHistorySuiteBase |partitioned by (b) |as select 1 as a, 'x' as b """.stripMargin) + val appendOnlyProperty = Map[String, String]("delta.appendOnly" -> "true") checkLastOperation( tempDir, Seq("CREATE TABLE AS SELECT", "false", """["b"]""", """[]""", - """{"delta.appendOnly":"true"}""", null), + getPropertiesJson(Some(appendOnlyProperty)), + null), Seq($"operation", $"operationParameters.isManaged", $"operationParameters.partitionBy", $"operationParameters.clusterBy", $"operationParameters.properties", $"operationParameters.description")) @@ -321,7 +341,7 @@ trait DescribeDeltaHistorySuiteBase checkLastOperation( tempDir2, Seq("CREATE TABLE AS SELECT", - "false", """[]""", """[]""", """{}""", "this is my table"), + "false", """[]""", """[]""", getPropertiesJson(), "this is my table"), Seq($"operation", $"operationParameters.isManaged", $"operationParameters.partitionBy", $"operationParameters.clusterBy", $"operationParameters.properties", $"operationParameters.description")) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala index 9ba0a2a6905..b78068383a7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala @@ -33,6 +33,22 @@ import org.apache.spark.sql.test.SharedSparkSession trait CoordinatedCommitsTestUtils extends DeltaTestUtilsBase { self: SparkFunSuite with SharedSparkSession => + protected val defaultCommitsCoordinatorName = "tracking-in-memory" + protected val defaultCommitsCoordinatorConf = Map("randomConf" -> "randomConfValue") + + def getCoordinatedCommitsDefaultProperties(withICT: Boolean = false): Map[String, String] = { + val coordinatedCommitsConfJson = JsonUtils.toJson(defaultCommitsCoordinatorConf) + val properties = Map( + DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> defaultCommitsCoordinatorName, + DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key -> coordinatedCommitsConfJson, + DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF.key -> "{}") + if (withICT) { + properties + (DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key -> "true") + } else { + properties + } + } + /** * Runs a specific test with coordinated commits default properties unset. * Any table created in this test won't have coordinated commits enabled by default. @@ -100,11 +116,10 @@ trait CoordinatedCommitsTestUtils CommitCoordinatorProvider.clearNonDefaultBuilders() CommitCoordinatorProvider.registerBuilder( TrackingInMemoryCommitCoordinatorBuilder(backfillBatchSize)) - val coordinatedCommitsCoordinatorConf = Map("randomConf" -> "randomConfValue") - val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(coordinatedCommitsCoordinatorConf) + val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(defaultCommitsCoordinatorConf) withSQLConf( DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey -> - "tracking-in-memory", + defaultCommitsCoordinatorName, DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey -> coordinatedCommitsCoordinatorJson) { f @@ -121,11 +136,10 @@ trait CoordinatedCommitsTestUtils f(None) } testWithDifferentBackfillInterval(testName) { backfillBatchSize => - val coordinatedCommitsCoordinatorConf = Map("randomConf" -> "randomConfValue") - val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(coordinatedCommitsCoordinatorConf) + val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(defaultCommitsCoordinatorConf) withSQLConf( DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey -> - "tracking-in-memory", + defaultCommitsCoordinatorName, DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey -> coordinatedCommitsCoordinatorJson) { f(Some(backfillBatchSize)) @@ -138,7 +152,7 @@ trait CoordinatedCommitsTestUtils oldMetadata: Metadata = Metadata()): UpdatedActions = { val newMetadataConfiguration = oldMetadata.configuration + - (DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> "tracking-in-memory") + (DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> defaultCommitsCoordinatorName) val newMetadata = oldMetadata.copy(configuration = newMetadataConfiguration) UpdatedActions(commitInfo, newMetadata, Protocol(), oldMetadata, Protocol()) } @@ -292,7 +306,10 @@ class TrackingCommitCoordinatorClient( * A helper class which enables coordinated-commits for the test suite based on the given * `coordinatedCommitsBackfillBatchSize` conf. */ -trait CoordinatedCommitsBaseSuite extends SparkFunSuite with SharedSparkSession { +trait CoordinatedCommitsBaseSuite + extends SparkFunSuite + with SharedSparkSession + with CoordinatedCommitsTestUtils { // If this config is not overridden, coordinated commits are disabled. def coordinatedCommitsBackfillBatchSize: Option[Int] = None @@ -301,12 +318,11 @@ trait CoordinatedCommitsBaseSuite extends SparkFunSuite with SharedSparkSession override protected def sparkConf: SparkConf = { if (coordinatedCommitsBackfillBatchSize.nonEmpty) { - val coordinatedCommitsCoordinatorConf = Map("randomConf" -> "randomConfValue") - val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(coordinatedCommitsCoordinatorConf) + val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(defaultCommitsCoordinatorConf) super.sparkConf .set( DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey, - "tracking-in-memory") + defaultCommitsCoordinatorName) .set( DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey, coordinatedCommitsCoordinatorJson)