From 68f4e0213f5b6793acb96b2d9e08c102439565c4 Mon Sep 17 00:00:00 2001 From: Huong Vuong Date: Wed, 2 Nov 2022 23:43:14 +0700 Subject: [PATCH] Add Spark SQL connector (#277) * Add SparkSQL connector * Fix SparkSQL constructor * Add query as mandatory field * Add SparkSQLConnector test cases * Move spark session init into test Co-authored-by: huong.vuong --- .../java/io/github/setl/enums/Storage.java | 1 + .../storage/connector/SparkSQLConnector.scala | 47 +++++++++++ src/test/resources/application.conf | 6 ++ .../io/github/setl/config/Properties.scala | 1 + .../connector/SparkSQLConnectorSuite.scala | 80 +++++++++++++++++++ 5 files changed, 135 insertions(+) create mode 100644 src/main/scala/io/github/setl/storage/connector/SparkSQLConnector.scala create mode 100644 src/test/scala/io/github/setl/storage/connector/SparkSQLConnectorSuite.scala diff --git a/src/main/java/io/github/setl/enums/Storage.java b/src/main/java/io/github/setl/enums/Storage.java index 41b1c56..c86c152 100644 --- a/src/main/java/io/github/setl/enums/Storage.java +++ b/src/main/java/io/github/setl/enums/Storage.java @@ -14,6 +14,7 @@ public enum Storage { JDBC("io.github.setl.storage.connector.JDBCConnector"), STRUCTURED_STREAMING("io.github.setl.storage.connector.StructuredStreamingConnector"), HUDI("io.github.setl.storage.connector.HudiConnector"), + SPARK_SQL("io.github.setl.storage.connector.SparkSQLConnector"), OTHER(null); private String connectorName; diff --git a/src/main/scala/io/github/setl/storage/connector/SparkSQLConnector.scala b/src/main/scala/io/github/setl/storage/connector/SparkSQLConnector.scala new file mode 100644 index 0000000..5cf4856 --- /dev/null +++ b/src/main/scala/io/github/setl/storage/connector/SparkSQLConnector.scala @@ -0,0 +1,47 @@ +package io.github.setl.storage.connector + +import com.typesafe.config.Config +import io.github.setl.config.Conf +import io.github.setl.enums.Storage +import io.github.setl.util.TypesafeConfigUtils +import org.apache.spark.sql.DataFrame + +class SparkSQLConnector(val query: String) extends Connector { + override val storage: Storage = Storage.SPARK_SQL + + def this(conf: Conf) = this(conf.get("query", "")) + def this(config: Config) = this( + query = TypesafeConfigUtils.getAs[String](config, "query").getOrElse("") + ) + + require(query.nonEmpty, "query is not defined") + + /** + * Read data from the data source + * + * @return a [[DataFrame]] + */ + @throws[org.apache.spark.sql.AnalysisException](s"$query is invalid") + override def read(): DataFrame = spark.sql(query) + + /** + * Write a [[DataFrame]] into the data storage + * + * @param t a [[DataFrame]] to be saved + * @param suffix for data connectors that support suffix (e.g. [[FileConnector]], + * add the given suffix to the save path + */ + override def write(t: DataFrame, suffix: Option[String]): Unit = { + if (suffix.isDefined) logWarning("suffix is not supported in SparkSQLConnector") + write(t) + } + + /** + * Write a [[DataFrame]] into the data storage + * + * @param t a [[DataFrame]] to be saved + */ + override def write(t: DataFrame): Unit = { + logWarning("write is not supported in SparkSQLConnector") + } +} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index bdaba72..82f82ca 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -263,3 +263,9 @@ hudi { hoodie.datasource.write.table.type = "MERGE_ON_READ" } } + +sparkSQL { + test { + query = "SELECT * FROM schema.table" + } +} diff --git a/src/test/scala/io/github/setl/config/Properties.scala b/src/test/scala/io/github/setl/config/Properties.scala index e53383f..617083a 100644 --- a/src/test/scala/io/github/setl/config/Properties.scala +++ b/src/test/scala/io/github/setl/config/Properties.scala @@ -26,6 +26,7 @@ object Properties { val jdbcConfig: Config = cl.getConfig("psql.test") val hudiConfig : Config = cl.getConfig("hudi.test") + val sparkSQLConfig : Config = cl.getConfig("sparkSQL.test") val excelConfigConnector: Config = cl.getConfig("connector.excel") val cassandraConfigConnector: Config = cl.getConfig("connector.cassandra") diff --git a/src/test/scala/io/github/setl/storage/connector/SparkSQLConnectorSuite.scala b/src/test/scala/io/github/setl/storage/connector/SparkSQLConnectorSuite.scala new file mode 100644 index 0000000..8bf951d --- /dev/null +++ b/src/test/scala/io/github/setl/storage/connector/SparkSQLConnectorSuite.scala @@ -0,0 +1,80 @@ +package io.github.setl.storage.connector + +import io.github.setl.config.{Conf, Properties} +import io.github.setl.{SparkSessionBuilder, TestObject} +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.scalatest.funsuite.AnyFunSuite + +class SparkSQLConnectorSuite extends AnyFunSuite{ + + val query : String = + """ + | SELECT (ones.n1 + tens.n2 * 10) as user_id + | FROM ( + | SELECT 0 AS n1 + | UNION SELECT 1 AS n1 + | UNION SELECT 2 AS n1 + | UNION SELECT 3 AS n1 + | UNION SELECT 4 AS n1 + | UNION SELECT 5 AS n1 + | UNION SELECT 6 AS n1 + | UNION SELECT 7 AS n1 + | UNION SELECT 8 AS n1 + | UNION SELECT 9 AS n1 + | ) ones + | CROSS JOIN + | ( + | SELECT 0 AS n2 + | UNION SELECT 1 AS n2 + | UNION SELECT 2 AS n2 + | UNION SELECT 3 AS n2 + | UNION SELECT 4 AS n2 + | UNION SELECT 5 AS n2 + | UNION SELECT 6 AS n2 + | UNION SELECT 7 AS n2 + | UNION SELECT 8 AS n2 + | UNION SELECT 9 AS n2 + | ) tens + |""".stripMargin + + val testTable: Seq[TestObject] = Seq( + TestObject(1, "p1", "c1", 1L), + TestObject(2, "p2", "c2", 2L), + TestObject(3, "p3", "c3", 3L) + ) + + val options : Map[String, String] = Map( + "query" -> query + ) + + + test("Instantiation of constructors") { + val connector = new SparkSQLConnector(query) + assert(connector.query === query) + + val testConfig = Properties.sparkSQLConfig + val connector2 = new SparkSQLConnector(testConfig) + assert(connector2.query === "SELECT * FROM schema.table") + + val connector3 = new SparkSQLConnector(Conf.fromMap(options)) + assert(connector3.query === query) + + assertThrows[IllegalArgumentException](new SparkSQLConnector("")) + assertThrows[IllegalArgumentException](new SparkSQLConnector(Conf.fromMap(Map.empty))) + assertThrows[IllegalArgumentException](new SparkSQLConnector(testConfig.withoutPath("query"))) + } + + test("Read/Write of SparkSQLConnector") { + val spark: SparkSession = SparkSession.builder().config(new SparkConf()).master("local[*]").getOrCreate() + import spark.implicits._ + + val connector = new SparkSQLConnector(query) + assert(connector.read().collect().length == 100) + + // Should log warning & do nothing + val testDF = testTable.toDF() + connector.write(testDF) + connector.write(testDF, Some("any_")) + } +}