Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Extend coordinated commit changes to other suites #3305

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,7 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest
)
txn.commit(newMetadata :: Nil, DeltaOperations.ManualUpdate)
val coordinatedCommitsProperties = batchSizeOpt.map(_ =>
Map(DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> "tracking-in-memory",
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key ->
"{\"randomConf\":\"randomConfValue\"}",
DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF.key -> "{}",
DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key -> "true"))
getCoordinatedCommitsDefaultProperties(withICT = true))
.getOrElse(Map.empty)
checkResult(sql(s"DESCRIBE DETAIL $tableName"),
Seq(Map("foo" -> "bar") ++ coordinatedCommitsProperties),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import java.io.File

import org.apache.spark.sql.delta.DescribeDeltaHistorySuiteShims._
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, Metadata, Protocol, RemoveFile}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.scalactic.source.Position
import org.scalatest.Tag

Expand All @@ -42,7 +43,8 @@ trait DescribeDeltaHistorySuiteBase
with SharedSparkSession
with DeltaSQLCommandTest
with DeltaTestUtilsForTempViews
with MergeIntoMetricsBase {
with MergeIntoMetricsBase
with CoordinatedCommitsBaseSuite {

import testImplicits._

Expand Down Expand Up @@ -139,6 +141,21 @@ trait DescribeDeltaHistorySuiteBase
.asInstanceOf[Map[String, String]]
}

// Returns necessary delta property json expected for the test. If coordinated commit is enabled,
// a few properties will be automatically populated, and this method will take care of it.
protected def getPropertiesJson(extraProperty: Option[Map[String, String]] = None): String = {
val coordinatedCommitsProperty = if (coordinatedCommitsEnabledInTests) {
getCoordinatedCommitsDefaultProperties()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this call-site needs withICT as false while the other one in DescribeDeltaDetailSuite.scala passes it as true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DESCRIBE DETAIL returns ICT information as well, but HISTORY doesn't. The helper just evolves to cover both cases.

} else {
Map.empty[String, String]
}
// For history command, the output omits the empty config value, so we also need to
// manually omit the value here.
val properties = coordinatedCommitsProperty.filterNot { case (_, value) => value == "{}" }
val finalProperties = extraProperty.map(properties ++ _).getOrElse(properties)
JsonUtils.toJson(finalProperties)
}

testWithFlag("basic case - Scala history with path-based table") {
val tempDir = Utils.createTempDir().toString
Seq(1, 2, 3).toDF().write.format("delta").save(tempDir)
Expand Down Expand Up @@ -256,14 +273,15 @@ trait DescribeDeltaHistorySuiteBase
|comment 'this is my table'
|tblproperties (delta.appendOnly=true)
""".stripMargin)
val appendOnlyTableProperty = Map("delta.appendOnly" -> "true")
checkLastOperation(
spark.sessionState.catalog.getTableMetadata(TableIdentifier("delta_test")).location.getPath,
Seq(
"CREATE TABLE",
"true",
"""["b"]""",
"""[]""",
"""{"delta.appendOnly":"true"}""",
getPropertiesJson(Some(appendOnlyTableProperty)),
"this is my table"),
Seq(
$"operation", $"operationParameters.isManaged", $"operationParameters.partitionBy",
Expand All @@ -279,7 +297,7 @@ trait DescribeDeltaHistorySuiteBase
.option("path", tempDir).saveAsTable("delta_test")
checkLastOperation(
tempDir,
Seq("CREATE TABLE AS SELECT", "false", """[]""", """[]""", "{}", null),
Seq("CREATE TABLE AS SELECT", "false", """[]""", """[]""", getPropertiesJson(), null),
Seq($"operation", $"operationParameters.isManaged", $"operationParameters.partitionBy",
$"operationParameters.clusterBy", $"operationParameters.properties",
$"operationParameters.description"))
Expand All @@ -297,13 +315,15 @@ trait DescribeDeltaHistorySuiteBase
|partitioned by (b)
|as select 1 as a, 'x' as b
""".stripMargin)
val appendOnlyProperty = Map[String, String]("delta.appendOnly" -> "true")
checkLastOperation(
tempDir,
Seq("CREATE TABLE AS SELECT",
"false",
"""["b"]""",
"""[]""",
"""{"delta.appendOnly":"true"}""", null),
getPropertiesJson(Some(appendOnlyProperty)),
null),
Seq($"operation", $"operationParameters.isManaged", $"operationParameters.partitionBy",
$"operationParameters.clusterBy", $"operationParameters.properties",
$"operationParameters.description"))
Expand All @@ -321,7 +341,7 @@ trait DescribeDeltaHistorySuiteBase
checkLastOperation(
tempDir2,
Seq("CREATE TABLE AS SELECT",
"false", """[]""", """[]""", """{}""", "this is my table"),
"false", """[]""", """[]""", getPropertiesJson(), "this is my table"),
Seq($"operation", $"operationParameters.isManaged", $"operationParameters.partitionBy",
$"operationParameters.clusterBy", $"operationParameters.properties",
$"operationParameters.description"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ import org.apache.spark.sql.test.SharedSparkSession
trait CoordinatedCommitsTestUtils
extends DeltaTestUtilsBase { self: SparkFunSuite with SharedSparkSession =>

protected val defaultCommitsCoordinatorName = "tracking-in-memory"
protected val defaultCommitsCoordinatorConf = Map("randomConf" -> "randomConfValue")

def getCoordinatedCommitsDefaultProperties(withICT: Boolean = false): Map[String, String] = {
val coordinatedCommitsConfJson = JsonUtils.toJson(defaultCommitsCoordinatorConf)
val properties = Map(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> defaultCommitsCoordinatorName,
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key -> coordinatedCommitsConfJson,
DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF.key -> "{}")
if (withICT) {
properties + (DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key -> "true")
} else {
properties
}
}

/**
* Runs a specific test with coordinated commits default properties unset.
* Any table created in this test won't have coordinated commits enabled by default.
Expand Down Expand Up @@ -100,11 +116,10 @@ trait CoordinatedCommitsTestUtils
CommitCoordinatorProvider.clearNonDefaultBuilders()
CommitCoordinatorProvider.registerBuilder(
TrackingInMemoryCommitCoordinatorBuilder(backfillBatchSize))
val coordinatedCommitsCoordinatorConf = Map("randomConf" -> "randomConfValue")
val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(coordinatedCommitsCoordinatorConf)
val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(defaultCommitsCoordinatorConf)
withSQLConf(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey ->
"tracking-in-memory",
defaultCommitsCoordinatorName,
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey ->
coordinatedCommitsCoordinatorJson) {
f
Expand All @@ -121,11 +136,10 @@ trait CoordinatedCommitsTestUtils
f(None)
}
testWithDifferentBackfillInterval(testName) { backfillBatchSize =>
val coordinatedCommitsCoordinatorConf = Map("randomConf" -> "randomConfValue")
val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(coordinatedCommitsCoordinatorConf)
val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(defaultCommitsCoordinatorConf)
withSQLConf(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey ->
"tracking-in-memory",
defaultCommitsCoordinatorName,
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey ->
coordinatedCommitsCoordinatorJson) {
f(Some(backfillBatchSize))
Expand All @@ -138,7 +152,7 @@ trait CoordinatedCommitsTestUtils
oldMetadata: Metadata = Metadata()): UpdatedActions = {
val newMetadataConfiguration =
oldMetadata.configuration +
(DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> "tracking-in-memory")
(DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> defaultCommitsCoordinatorName)
val newMetadata = oldMetadata.copy(configuration = newMetadataConfiguration)
UpdatedActions(commitInfo, newMetadata, Protocol(), oldMetadata, Protocol())
}
Expand Down Expand Up @@ -292,7 +306,10 @@ class TrackingCommitCoordinatorClient(
* A helper class which enables coordinated-commits for the test suite based on the given
* `coordinatedCommitsBackfillBatchSize` conf.
*/
trait CoordinatedCommitsBaseSuite extends SparkFunSuite with SharedSparkSession {
trait CoordinatedCommitsBaseSuite
extends SparkFunSuite
with SharedSparkSession
with CoordinatedCommitsTestUtils {

// If this config is not overridden, coordinated commits are disabled.
def coordinatedCommitsBackfillBatchSize: Option[Int] = None
Expand All @@ -301,12 +318,11 @@ trait CoordinatedCommitsBaseSuite extends SparkFunSuite with SharedSparkSession

override protected def sparkConf: SparkConf = {
if (coordinatedCommitsBackfillBatchSize.nonEmpty) {
val coordinatedCommitsCoordinatorConf = Map("randomConf" -> "randomConfValue")
val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(coordinatedCommitsCoordinatorConf)
val coordinatedCommitsCoordinatorJson = JsonUtils.toJson(defaultCommitsCoordinatorConf)
super.sparkConf
.set(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey,
"tracking-in-memory")
defaultCommitsCoordinatorName)
.set(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey,
coordinatedCommitsCoordinatorJson)
Expand Down
Loading