Skip to content

Commit

Permalink
Add Spark SQL connector (#277)
Browse files Browse the repository at this point in the history
* Add SparkSQL connector

* Fix SparkSQL constructor

* Add query as mandatory field

* Add SparkSQLConnector test cases

* Move spark session init into test

Co-authored-by: huong.vuong <huong.vuong@grabtaxi.com>
  • Loading branch information
hoaihuongbk and huong.vuong authored Nov 2, 2022
1 parent a170c57 commit 68f4e02
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/main/java/io/github/setl/enums/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public enum Storage {
JDBC("io.github.setl.storage.connector.JDBCConnector"),
STRUCTURED_STREAMING("io.github.setl.storage.connector.StructuredStreamingConnector"),
HUDI("io.github.setl.storage.connector.HudiConnector"),
SPARK_SQL("io.github.setl.storage.connector.SparkSQLConnector"),
OTHER(null);

private String connectorName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.github.setl.storage.connector

import com.typesafe.config.Config
import io.github.setl.config.Conf
import io.github.setl.enums.Storage
import io.github.setl.util.TypesafeConfigUtils
import org.apache.spark.sql.DataFrame

class SparkSQLConnector(val query: String) extends Connector {
override val storage: Storage = Storage.SPARK_SQL

def this(conf: Conf) = this(conf.get("query", ""))
def this(config: Config) = this(
query = TypesafeConfigUtils.getAs[String](config, "query").getOrElse("")
)

require(query.nonEmpty, "query is not defined")

/**
* Read data from the data source
*
* @return a [[DataFrame]]
*/
@throws[org.apache.spark.sql.AnalysisException](s"$query is invalid")
override def read(): DataFrame = spark.sql(query)

/**
* Write a [[DataFrame]] into the data storage
*
* @param t a [[DataFrame]] to be saved
* @param suffix for data connectors that support suffix (e.g. [[FileConnector]],
* add the given suffix to the save path
*/
override def write(t: DataFrame, suffix: Option[String]): Unit = {
if (suffix.isDefined) logWarning("suffix is not supported in SparkSQLConnector")
write(t)
}

/**
* Write a [[DataFrame]] into the data storage
*
* @param t a [[DataFrame]] to be saved
*/
override def write(t: DataFrame): Unit = {
logWarning("write is not supported in SparkSQLConnector")
}
}
6 changes: 6 additions & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,9 @@ hudi {
hoodie.datasource.write.table.type = "MERGE_ON_READ"
}
}

sparkSQL {
test {
query = "SELECT * FROM schema.table"
}
}
1 change: 1 addition & 0 deletions src/test/scala/io/github/setl/config/Properties.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object Properties {
val jdbcConfig: Config = cl.getConfig("psql.test")

val hudiConfig : Config = cl.getConfig("hudi.test")
val sparkSQLConfig : Config = cl.getConfig("sparkSQL.test")

val excelConfigConnector: Config = cl.getConfig("connector.excel")
val cassandraConfigConnector: Config = cl.getConfig("connector.cassandra")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.github.setl.storage.connector

import io.github.setl.config.{Conf, Properties}
import io.github.setl.{SparkSessionBuilder, TestObject}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.funsuite.AnyFunSuite

class SparkSQLConnectorSuite extends AnyFunSuite{

val query : String =
"""
| SELECT (ones.n1 + tens.n2 * 10) as user_id
| FROM (
| SELECT 0 AS n1
| UNION SELECT 1 AS n1
| UNION SELECT 2 AS n1
| UNION SELECT 3 AS n1
| UNION SELECT 4 AS n1
| UNION SELECT 5 AS n1
| UNION SELECT 6 AS n1
| UNION SELECT 7 AS n1
| UNION SELECT 8 AS n1
| UNION SELECT 9 AS n1
| ) ones
| CROSS JOIN
| (
| SELECT 0 AS n2
| UNION SELECT 1 AS n2
| UNION SELECT 2 AS n2
| UNION SELECT 3 AS n2
| UNION SELECT 4 AS n2
| UNION SELECT 5 AS n2
| UNION SELECT 6 AS n2
| UNION SELECT 7 AS n2
| UNION SELECT 8 AS n2
| UNION SELECT 9 AS n2
| ) tens
|""".stripMargin

val testTable: Seq[TestObject] = Seq(
TestObject(1, "p1", "c1", 1L),
TestObject(2, "p2", "c2", 2L),
TestObject(3, "p3", "c3", 3L)
)

val options : Map[String, String] = Map(
"query" -> query
)


test("Instantiation of constructors") {
val connector = new SparkSQLConnector(query)
assert(connector.query === query)

val testConfig = Properties.sparkSQLConfig
val connector2 = new SparkSQLConnector(testConfig)
assert(connector2.query === "SELECT * FROM schema.table")

val connector3 = new SparkSQLConnector(Conf.fromMap(options))
assert(connector3.query === query)

assertThrows[IllegalArgumentException](new SparkSQLConnector(""))
assertThrows[IllegalArgumentException](new SparkSQLConnector(Conf.fromMap(Map.empty)))
assertThrows[IllegalArgumentException](new SparkSQLConnector(testConfig.withoutPath("query")))
}

test("Read/Write of SparkSQLConnector") {
val spark: SparkSession = SparkSession.builder().config(new SparkConf()).master("local[*]").getOrCreate()
import spark.implicits._

val connector = new SparkSQLConnector(query)
assert(connector.read().collect().length == 100)

// Should log warning & do nothing
val testDF = testTable.toDF()
connector.write(testDF)
connector.write(testDF, Some("any_"))
}
}

0 comments on commit 68f4e02

Please sign in to comment.