Skip to content

Commit

Permalink
[SPARK-1870] Make spark-submit --jars work in yarn-cluster mode.
Browse files Browse the repository at this point in the history
Sent secondary jars to distributed cache of all containers and add the cached jars to classpath before executors start. Tested on a YARN cluster (CDH-5.0).

`spark-submit --jars` also works in standalone server and `yarn-client`. Thanks for @andrewor14 for testing!

I removed "Doesn't work for drivers in standalone mode with "cluster" deploy mode." from `spark-submit`'s help message, though we haven't tested mesos yet.

CC: @dbtsai @sryza

Author: Xiangrui Meng <meng@databricks.com>

Closes mesos#848 from mengxr/yarn-classpath and squashes the following commits:

23e7df4 [Xiangrui Meng] rename spark.jar to __spark__.jar and app.jar to __app__.jar to avoid confliction apped $CWD/ and $CWD/* to the classpath remove unused methods
a40f6ed [Xiangrui Meng] standalone -> cluster
65e04ad [Xiangrui Meng] update spark-submit help message and add a comment for yarn-client
11e5354 [Xiangrui Meng] minor changes
3e7e1c4 [Xiangrui Meng] use sparkConf instead of hadoop conf
dc3c825 [Xiangrui Meng] add secondary jars to classpath in yarn
  • Loading branch information
mengxr authored and tdas committed May 22, 2014
1 parent 2a948e7 commit dba3140
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| --class CLASS_NAME Your application's main class (for Java / Scala apps).
| --name NAME A name of your application.
| --jars JARS Comma-separated list of local jars to include on the driver
| and executor classpaths. Doesn't work for drivers in
| standalone mode with "cluster" deploy mode.
| and executor classpaths.
| --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the
| PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
* Client submits an application to the YARN ResourceManager.
*
* Depending on the deployment mode this will launch one of two application master classes:
* 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
* 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
* which launches a driver program inside of the cluster.
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
* request executors on behalf of a driver running outside of the cluster.
Expand Down Expand Up @@ -220,10 +220,11 @@ trait ClientBase extends Logging {
}
}

var cachedSecondaryJarLinks = ListBuffer.empty[String]
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false) )
fileLists.foreach { case (flist, resType, appMasterOnly) =>
fileLists.foreach { case (flist, resType, addToClasspath) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
Expand All @@ -232,11 +233,15 @@ trait ClientBase extends Logging {
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
linkname, statCache, appMasterOnly)
linkname, statCache)
if (addToClasspath) {
cachedSecondaryJarLinks += linkname
}
}
}
}
}
sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))

UserGroupInformation.getCurrentUser().addCredentials(credentials)
localResources
Expand Down Expand Up @@ -374,11 +379,12 @@ trait ClientBase extends Logging {
}

object ClientBase {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val SPARK_JAR: String = "__spark__.jar"
val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
val LOCAL_SCHEME = "local"
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"

def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)

Expand Down Expand Up @@ -479,66 +485,25 @@ object ClientBase {

extraClassPath.foreach(addClasspathEntry)

addClasspathEntry(Environment.PWD.$())
val cachedSecondaryJarLinks =
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
// Normally the users app.jar is last in case conflicts with spark jars
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
addPwdClasspathEntry(APP_JAR)
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
} else {
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
addPwdClasspathEntry(APP_JAR)
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
}
// Append all class files and jar files under the working directory to the classpath.
addClasspathEntry(Environment.PWD.$())
addPwdClasspathEntry("*")
}

/**
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
* to the classpath.
*/
private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
if (args != null) {
addClasspathEntry(args.userJar, APP_JAR, env)
}

if (args != null && args.addJars != null) {
args.addJars.split(",").foreach { case file: String =>
addClasspathEntry(file, null, env)
}
}
}

/**
* Adds the given path to the classpath, handling "local:" URIs correctly.
*
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
* name will be added to the classpath (relative to the job's work directory).
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
* @param path Path to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addClasspathEntry(path: String, fileName: String,
env: HashMap[String, String]) : Unit = {
if (path != null) {
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
val localPath = getLocalPath(path)
if (localPath != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
File.pathSeparator)
return
}
}
}
if (fileName != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
}
}

/**
* Returns the local path if the URI is a "local:" URI, or null otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend(
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
"--class", "notused",
"--jar", null,
"--jar", null, // The primary jar will be added dynamically in SparkContext.
"--args", hostport,
"--am-class", classOf[ExecutorLauncher].getName
)
Expand Down

0 comments on commit dba3140

Please sign in to comment.