Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark SQL based Shark CLI #341

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2c06e38
[WIP]initial for sharkclidriver compabiled cli implementation
chenghao-intel Jun 3, 2014
5a3d9f8
update the jar dependencies
chenghao-intel Jun 3, 2014
0c2d7f6
Fix ClassCastException
chenghao-intel Jun 3, 2014
0477652
fix bug of cli prompt when switch to hive
chenghao-intel Jun 4, 2014
0afbc0f
update readme
chenghao-intel Jun 4, 2014
ef29e99
Fix bug of getting schema info
chenghao-intel Jun 4, 2014
6c1d9f5
Add bug info in the README
chenghao-intel Jun 4, 2014
3d344d0
remove the mistaken commit
chenghao-intel Jun 4, 2014
93b027f
enable the cli testing
chenghao-intel Jun 4, 2014
d752ed5
Remove the misktaken commit
chenghao-intel Jun 4, 2014
6e7b4d2
Add some document
chenghao-intel Jun 4, 2014
3050f80
Add CacheRdd reload support
chenghao-intel Jun 4, 2014
3e652fe
Update ReadMe for supporting the cached reload
chenghao-intel Jun 4, 2014
ca6255f
Output Error Message for HQL
chenghao-intel Jun 4, 2014
b5c031b
solve the netty / servlet-api jar conflict
chenghao-intel Jun 5, 2014
da57ff6
Jar conflict & Work around for CliSessionState modified by HiveContext
chenghao-intel Jun 5, 2014
b6792db
remove the cached table reload for next PR
chenghao-intel Jun 5, 2014
bf326ff
Minimize the changes for SharkBuild.scala
chenghao-intel Jun 5, 2014
a3732b9
Put the local maven as the last resolver
chenghao-intel Jun 5, 2014
02652cf
remove the unused class
chenghao-intel Jun 12, 2014
3470679
Make the unittest work
chenghao-intel Jun 12, 2014
93ca08a
Merge remote-tracking branch 'hao/sparkSqlBack' into sparkSqlCli
liancheng Jun 20, 2014
abb1a14
Refactored the CLI service
liancheng Jun 20, 2014
46544c7
Asked Git to ignore downloaded SBT launch jar file
liancheng Jun 20, 2014
60c4135
Minimized CatalystContext as we don't care response code for now
liancheng Jun 20, 2014
94c0825
Addressed PR comments
liancheng Jun 20, 2014
5a7c0a9
Deleted unused REPL code
liancheng Jun 20, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
sbt/sbt-launch-*.jar
target/
build/
metastore_db/
Expand All @@ -8,6 +9,7 @@ work/
run-tests-from-scratch-workspace/

conf/shark-env.sh
conf/hive-site.xml

# Compiled Source
*.class
Expand Down
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,19 @@ resultSet.next()
println(resultSet.getInt(1))
```

## Running Shark CLI
* Configure the shark_home/conf/shark-env.sh
* Configure the shark_home/conf/hive-site.xml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a missing feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the Spark SQL branch is changing somewhat rapidly, and I'd like to update the README file when it's somewhat stabilized :)

* Start the Shark CLI
```
$ bin/shark
catalyst> show tables;
catalyst> set shark.exec.mode=hive;
hive>show tables;
```
But there is a bug, which require show tables before doing anything else.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a bug anymore. :)


## Known Missing Features
* Shark CLI
* Restoring cached tables upon restart
* Invalidation of cached tables when data is INSERTed
* Off-heap storage using Tachyon
* TGFs
Expand Down
11 changes: 8 additions & 3 deletions project/SharkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ object SharkBuild extends Build {
val excludeXerces = ExclusionRule(organization = "xerces")
val excludeHive = ExclusionRule(organization = "org.apache.hive")


/** Extra artifacts not included in Spark SQL's Hive support. */
val hiveArtifacts = Seq("hive-cli", "hive-jdbc", "hive-beeline")
val hiveArtifacts = Seq("hive-cli", "hive-jdbc", "hive-exec", "hive-service", "hive-beeline")
val hiveDependencies = hiveArtifacts.map ( artifactId =>
"org.spark-project.hive" % artifactId % "0.12.0" excludeAll(
excludeGuava, excludeLog4j, excludeAsm, excludeNetty, excludeXerces, excludeServlet)
Expand All @@ -101,15 +100,21 @@ object SharkBuild extends Build {

libraryDependencies ++= hiveDependencies ++ yarnDependency,
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-hive" % SPARK_VERSION,
"org.apache.spark" %% "spark-hive" % SPARK_VERSION excludeAll(excludeHive, excludeServlet) force(),
"org.apache.spark" %% "spark-repl" % SPARK_VERSION,
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) force(),
"org.mortbay.jetty" % "jetty" % "6.1.26" exclude ("org.mortbay.jetty", "servlet-api") force(),
"org.eclipse.jetty.orbit" % "javax.servlet" % "3.0.0.v201112011016" artifacts ( Artifact("javax.servlet", "jar", "jar") ),
"com.typesafe" %% "scalalogging-slf4j" % "1.0.1",
"org.scalatest" %% "scalatest" % "1.9.1" % "test"
),

// Download managed jars into lib_managed.
retrieveManaged := true,
resolvers ++= Seq(
"Maven Repository" at "http://repo.maven.apache.org/maven2",
"Apache Repository" at "https://repository.apache.org/content/repositories/releases",
"JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/",
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
"Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository"
Expand Down
32 changes: 32 additions & 0 deletions src/main/scala/org/apache/spark/sql/hive/CatalystContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.spark.SparkContext

import shark.LogHelper

class CatalystContext(sc: SparkContext) extends HiveContext(sc) with LogHelper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkSQLContext?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or JDBCContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I'd rather just call it SharkContext. Catalyst is a query optimization framework, Spark SQL is more than that, but neither of them is a concept parallel to Hive. And this class is actually not very much related to JDBC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

type QueryExecution = HiveContext#QueryExecution

@transient protected[hive] override lazy val sessionState = SessionState.get()
@transient protected[hive] override lazy val hiveconf = sessionState.getConf

def executeHiveQL(statement: String): this.QueryExecution = executePlan(hql(statement).logicalPlan)
}
89 changes: 89 additions & 0 deletions src/main/scala/shark/CatalystDriver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (C) 2012 The Regents of The University California.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package shark

import scala.collection.JavaConversions._

import java.util.{ArrayList => JArrayList}

import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.spark.sql.hive.{CatalystContext, HiveMetastoreTypes}

class CatalystDriver(val context: CatalystContext = CatalystEnv.catalystContext) extends Driver with LogHelper {
private var tableSchema: Schema = _
private var hiveResponse: Seq[String] = _

override def init(): Unit = {
}

private def getResultSetSchema(query: context.QueryExecution): Schema = {
val analyzed = query.analyzed
logger.debug(s"Result Schema: ${analyzed.output}")
if (analyzed.output.size == 0) {
new Schema(new FieldSchema("Response code", "string", "") :: Nil, null)
} else {
val fieldSchemas = analyzed.output.map { attr =>
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}

new Schema(fieldSchemas, null)
}
}

override def run(command: String): CommandProcessorResponse = {
val execution = context.executeHiveQL(command)

// TODO unify the error code
try {
hiveResponse = execution.stringResult()
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
} catch {
case cause: Throwable =>
logError(s"Failed in [$command]", cause)
new CommandProcessorResponse(-3, ExceptionUtils.getFullStackTrace(cause), null)
}
}

override def close(): Int = {
hiveResponse = null
tableSchema = null
0
}

override def getSchema: Schema = tableSchema

override def getResults(res: JArrayList[String]): Boolean = {
if (hiveResponse == null) {
false
} else {
res.addAll(hiveResponse)
hiveResponse = null
true
}
}

override def destroy() {
super.destroy()
hiveResponse = null
tableSchema = null
}
}
145 changes: 145 additions & 0 deletions src/main/scala/shark/CatalystEnv.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright (C) 2012 The Regents of The University California.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package shark

import scala.collection.mutable
import scala.collection.mutable.{HashMap, HashSet}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.CatalystContext
import org.apache.spark.scheduler.SplitInfo

/** A singleton object for the master program. The slaves should not access this. */
// TODO add tachyon / memory store based (Copied from SharkEnv.scala)
object CatalystEnv extends LogHelper {

def init(): CatalystContext = {
if (catalystContext == null) {
initWithCatalystContext()
}

catalystContext
}

def fixIncompatibleConf(conf: Configuration) {
if (sparkContext == null) {
init()
}

val hiveIsLocal = ShimLoader.getHadoopShims.isLocalMode(conf)
if (!sparkContext.isLocal && hiveIsLocal) {
val warnMessage = "Hive Hadoop shims detected local mode, but Shark is not running locally."
logWarning(warnMessage)

// Try to fix this without bothering user
val newValue = "Spark_%s".format(System.currentTimeMillis())
for (k <- Seq("mapred.job.tracker", "mapreduce.framework.name")) {
val v = conf.get(k)
if (v == null || v == "" || v == "local") {
conf.set(k, newValue)
logWarning("Setting %s to '%s' (was '%s')".format(k, newValue, v))
}
}

// If still not fixed, bail out
if (ShimLoader.getHadoopShims.isLocalMode(conf)) {
throw new Exception(warnMessage)
}
}
}

def initWithCatalystContext(
jobName: String = "Shark::" + java.net.InetAddress.getLocalHost.getHostName,
master: String = System.getenv("MASTER")): CatalystContext = {

sparkContext = initSparkContext(jobName, master)
sparkContext.addSparkListener(new StatsReportListener())

catalystContext = new CatalystContext(sparkContext)
catalystContext
}

private def initSparkContext(
jobName: String = "Shark::" + java.net.InetAddress.getLocalHost.getHostName,
master: String = System.getenv("MASTER")): SparkContext = {

if (sparkContext != null) {
sparkContext.stop()
}

sparkContext = new SparkContext(
createSparkConf(if (master == null) "local" else master,
jobName,
System.getenv("SPARK_HOME"),
Nil,
executorEnvVars), Map[String, Set[SplitInfo]]())

sparkContext
}

private def createSparkConf(
master: String,
jobName: String,
sparkHome: String,
jars: Seq[String],
environment: HashMap[String, String]): SparkConf = {

val newConf = new SparkConf()
.setMaster(master)
.setAppName(jobName)
.setJars(jars)
.setExecutorEnv(environment.toSeq)

Option(sparkHome).foreach(newConf.setSparkHome)
newConf
}

logDebug("Initializing SharkEnv")

val executorEnvVars = {
val envVars = Set(
"SPARK_MEM",
"SPARK_CLASSPATH",
"HADOOP_HOME",
"JAVA_HOME",
"MESOS_NATIVE_LIBRARY",
"TACHYON_MASTER",
"TACHYON_WAREHOUSE_PATH")
HashMap.empty ++= envVars.map { key =>
key -> Option(System.getenv(key)).getOrElse("")
}.toMap
}

var catalystContext: CatalystContext = _

var sparkContext: SparkContext = _

/** Cleans up and shuts down the Shark environments. */
def stop() {
logDebug("Shutting down Shark Environment")
// Stop the SparkContext
if (CatalystEnv.sparkContext != null) {
sparkContext.stop()
sparkContext = null
catalystContext = null
}
}
}
Loading