-
Notifications
You must be signed in to change notification settings - Fork 327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark SQL based Shark CLI #341
Changes from 25 commits
2c06e38
5a3d9f8
0c2d7f6
0477652
0afbc0f
ef29e99
6c1d9f5
3d344d0
93b027f
d752ed5
6e7b4d2
3050f80
3e652fe
ca6255f
b5c031b
da57ff6
b6792db
bf326ff
a3732b9
02652cf
3470679
93ca08a
abb1a14
46544c7
60c4135
94c0825
5a7c0a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,9 +59,19 @@ resultSet.next() | |
println(resultSet.getInt(1)) | ||
``` | ||
|
||
## Running Shark CLI | ||
* Configure the shark_home/conf/shark-env.sh | ||
* Configure the shark_home/conf/hive-site.xml | ||
* Start the Shark CLI | ||
``` | ||
$ bin/shark | ||
catalyst> show tables; | ||
catalyst> set shark.exec.mode=hive; | ||
hive>show tables; | ||
``` | ||
But there is a bug, which require show tables before doing anything else. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not be a bug anymore. :) |
||
|
||
## Known Missing Features | ||
* Shark CLI | ||
* Restoring cached tables upon restart | ||
* Invalidation of cached tables when data is INSERTed | ||
* Off-heap storage using Tachyon | ||
* TGFs | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.hive | ||
|
||
import org.apache.hadoop.hive.ql.session.SessionState | ||
import org.apache.spark.SparkContext | ||
|
||
import shark.LogHelper | ||
|
||
class CatalystContext(sc: SparkContext) extends HiveContext(sc) with LogHelper { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SparkSQLContext? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or JDBCContext? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... I'd rather just call it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM |
||
type QueryExecution = HiveContext#QueryExecution | ||
|
||
@transient protected[hive] override lazy val sessionState = SessionState.get() | ||
@transient protected[hive] override lazy val hiveconf = sessionState.getConf | ||
|
||
def executeHiveQL(statement: String) = executePlan(hql(statement).logicalPlan) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Copyright (C) 2012 The Regents of The University California. | ||
* All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package shark | ||
|
||
import scala.collection.JavaConversions._ | ||
|
||
import java.util.{ArrayList => JArrayList} | ||
|
||
import org.apache.commons.lang.exception.ExceptionUtils | ||
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} | ||
import org.apache.hadoop.hive.ql.Driver | ||
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse | ||
import org.apache.spark.sql.hive.{CatalystContext, HiveMetastoreTypes} | ||
|
||
class CatalystDriver(val context: CatalystContext = CatalystEnv.catalystContext) extends Driver with LogHelper { | ||
private var tableSchema: Schema = _ | ||
private var hiveResponse: Seq[String] = _ | ||
|
||
override def init(): Unit = { | ||
} | ||
|
||
private def getResultSetSchema(query: context.QueryExecution): Schema = { | ||
val analyzed = query.analyzed | ||
logger.debug(s"Result Schema: ${analyzed.output}") | ||
if (analyzed.output.size == 0) { | ||
new Schema(new FieldSchema("Response code", "string", "") :: Nil, null) | ||
} else { | ||
val fieldSchemas = analyzed.output.map { attr => | ||
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") | ||
} | ||
|
||
new Schema(fieldSchemas, null) | ||
} | ||
} | ||
|
||
override def run(command: String): CommandProcessorResponse = { | ||
val execution = context.executeHiveQL(command) | ||
|
||
// TODO unify the error code | ||
try { | ||
hiveResponse = execution.stringResult() | ||
tableSchema = getResultSetSchema(execution) | ||
new CommandProcessorResponse(0) | ||
} catch { | ||
case cause: Throwable => | ||
logError(s"Failed in [$command]", cause) | ||
new CommandProcessorResponse(-3, ExceptionUtils.getFullStackTrace(cause), null) | ||
} | ||
} | ||
|
||
override def close(): Int = { | ||
hiveResponse = null | ||
tableSchema = null | ||
0 | ||
} | ||
|
||
override def getSchema: Schema = tableSchema | ||
|
||
override def getResults(res: JArrayList[String]): Boolean = { | ||
if(hiveResponse == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. space after if |
||
false | ||
} else { | ||
res.addAll(hiveResponse) | ||
hiveResponse = null | ||
true | ||
} | ||
} | ||
|
||
override def destroy() { | ||
super.destroy() | ||
hiveResponse = null | ||
tableSchema = null | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
/* | ||
* Copyright (C) 2012 The Regents of The University California. | ||
* All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package shark | ||
|
||
import scala.collection.mutable.{HashMap, HashSet} | ||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.hive.shims.ShimLoader | ||
import org.apache.spark.SparkConf | ||
import org.apache.spark.scheduler.StatsReportListener | ||
import org.apache.spark.SparkContext | ||
import org.apache.spark.sql.hive.CatalystContext | ||
import org.apache.spark.scheduler.SplitInfo | ||
|
||
/** A singleton object for the master program. The slaves should not access this. */ | ||
// TODO add tachyon / memory store based (Copied from SharkEnv.scala) | ||
object CatalystEnv extends LogHelper { | ||
|
||
def init(): CatalystContext = { | ||
if (catalystContext == null) { | ||
initWithCatalystContext() | ||
} | ||
|
||
catalystContext | ||
} | ||
|
||
def fixUncompatibleConf(conf: Configuration) { | ||
if (sparkContext == null) { | ||
init() | ||
} | ||
|
||
val hiveIslocal = ShimLoader.getHadoopShims.isLocalMode(conf) | ||
if (!sparkContext.isLocal && hiveIslocal) { | ||
val warnMessage = "Hive Hadoop shims detected local mode, but Shark is not running locally." | ||
logWarning(warnMessage) | ||
|
||
// Try to fix this without bothering user | ||
val newValue = "Spark_%s".format(System.currentTimeMillis()) | ||
for (k <- Seq("mapred.job.tracker", "mapreduce.framework.name")) { | ||
val v = conf.get(k) | ||
if (v == null || v == "" || v == "local") { | ||
conf.set(k, newValue) | ||
logWarning("Setting %s to '%s' (was '%s')".format(k, newValue, v)) | ||
} | ||
} | ||
|
||
// If still not fixed, bail out | ||
if (ShimLoader.getHadoopShims.isLocalMode(conf)) { | ||
throw new Exception(warnMessage) | ||
} | ||
} | ||
} | ||
|
||
def initWithCatalystContext( | ||
jobName: String = "Shark::" + java.net.InetAddress.getLocalHost.getHostName, | ||
master: String = System.getenv("MASTER")) | ||
: CatalystContext = { | ||
sparkContext = initSparkContext(jobName, master) | ||
|
||
sparkContext.addSparkListener(new StatsReportListener()) | ||
|
||
catalystContext = new CatalystContext(sparkContext) | ||
|
||
catalystContext | ||
} | ||
|
||
private def initSparkContext( | ||
jobName: String = "Shark::" + java.net.InetAddress.getLocalHost.getHostName, | ||
master: String = System.getenv("MASTER")): SparkContext = { | ||
if (sparkContext != null) { | ||
sparkContext.stop() | ||
} | ||
|
||
sparkContext = new SparkContext( | ||
createSparkConf(if (master == null) "local" else master, | ||
jobName, | ||
System.getenv("SPARK_HOME"), | ||
Nil, | ||
executorEnvVars), Map[String, Set[SplitInfo]]()) | ||
|
||
sparkContext | ||
} | ||
|
||
private def createSparkConf( | ||
master: String, | ||
jobName: String, | ||
sparkHome: String, | ||
jars: Seq[String], | ||
environment: HashMap[String, String]): SparkConf = { | ||
val newConf = new SparkConf() | ||
.setMaster(master) | ||
.setAppName(jobName) | ||
.setJars(jars) | ||
.setExecutorEnv(environment.toSeq) | ||
Option(sparkHome).foreach(newConf.setSparkHome) | ||
|
||
newConf | ||
} | ||
|
||
logDebug("Initializing SharkEnv") | ||
|
||
val executorEnvVars = new HashMap[String, String] | ||
executorEnvVars.put("SPARK_MEM", getEnv("SPARK_MEM")) | ||
executorEnvVars.put("SPARK_CLASSPATH", getEnv("SPARK_CLASSPATH")) | ||
executorEnvVars.put("HADOOP_HOME", getEnv("HADOOP_HOME")) | ||
executorEnvVars.put("JAVA_HOME", getEnv("JAVA_HOME")) | ||
executorEnvVars.put("MESOS_NATIVE_LIBRARY", getEnv("MESOS_NATIVE_LIBRARY")) | ||
executorEnvVars.put("TACHYON_MASTER", getEnv("TACHYON_MASTER")) | ||
executorEnvVars.put("TACHYON_WAREHOUSE_PATH", getEnv("TACHYON_WAREHOUSE_PATH")) | ||
|
||
val activeSessions = new HashSet[String] | ||
|
||
var catalystContext: CatalystContext = _ | ||
var sparkContext: SparkContext = _ | ||
|
||
// The following line turns Kryo serialization debug log on. It is extremely chatty. | ||
//com.esotericsoftware.minlog.Log.set(com.esotericsoftware.minlog.Log.LEVEL_DEBUG) | ||
|
||
// Keeps track of added JARs and files so that we don't add them twice in consecutive queries. | ||
val addedFiles = HashSet[String]() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are these two still useful? |
||
val addedJars = HashSet[String]() | ||
|
||
/** Cleans up and shuts down the Shark environments. */ | ||
def stop() { | ||
logDebug("Shutting down Shark Environment") | ||
// Stop the SparkContext | ||
if (CatalystEnv.sparkContext != null) { | ||
sparkContext.stop() | ||
sparkContext = null | ||
catalystContext = null | ||
} | ||
} | ||
|
||
/** Return the value of an environmental variable as a string. */ | ||
def getEnv(varname: String) = if (System.getenv(varname) == null) "" else System.getenv(varname) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a missing feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, the Spark SQL branch is changing somewhat rapidly, and I'd like to update the README file when it's somewhat stabilized :)