Skip to content

Commit

Permalink
Merged hops code
Browse files Browse the repository at this point in the history
fixes

Merged hops code

fixes
  • Loading branch information
tkakantousis committed Jan 8, 2019
1 parent 075447b commit 939ae8c
Show file tree
Hide file tree
Showing 20 changed files with 344 additions and 41 deletions.
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@

<!-- Provided dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<artifactId>xbean-asm6-shaded</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ private[spark] class SparkSubmit extends Logging {
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

// Other options
OptionAssigner(args.executorGPUs, STANDALONE | YARN, ALL_DEPLOY_MODES,
confKey = "spark.executor.gpus"),
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
var executorGPUs: String = null
var totalExecutorCores: String = null
var propertiesFile: String = null
var driverMemory: String = null
Expand All @@ -70,6 +71,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var isPython: Boolean = false
var pyFiles: String = null
var isR: Boolean = false
var isTensorFlow: String = null
var tensorflowNumPs: String = null
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
var proxyUser: String = null
Expand Down Expand Up @@ -181,6 +184,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
totalExecutorCores = Option(totalExecutorCores)
.orElse(sparkProperties.get("spark.cores.max"))
.orNull
executorGPUs = Option(executorGPUs)
.orElse(sparkProperties.get("spark.executor.gpus"))
.orElse(env.get("SPARK_EXECUTOR_GPUS"))
.orNull
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
Expand All @@ -198,6 +205,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
.orNull
numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
isTensorFlow = Option(isTensorFlow)
.orElse(sparkProperties.get("spark.tensorflow.application"))
.orElse(env.get("SPARK_TENSORFLOW_APPLICATION"))
.getOrElse("false")
tensorflowNumPs = Option(tensorflowNumPs)
.orElse(sparkProperties.get("spark.tensorflow.num.ps"))
.orElse(env.get("SPARK_TENSORFLOW_NUM_PS"))
.getOrElse("0")
queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
Expand Down Expand Up @@ -326,6 +341,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| deployMode $deployMode
| executorMemory $executorMemory
| executorCores $executorCores
| executorGPUs $executorGPUs
| totalExecutorCores $totalExecutorCores
| propertiesFile $propertiesFile
| driverMemory $driverMemory
Expand All @@ -335,6 +351,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| driverExtraJavaOptions $driverExtraJavaOptions
| supervise $supervise
| queue $queue
| isTensorFlow $isTensorFlow
| tensorflowNumPs $tensorflowNumPs
| numExecutors $numExecutors
| files $files
| pyFiles $pyFiles
Expand Down Expand Up @@ -379,6 +397,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case TOTAL_EXECUTOR_CORES =>
totalExecutorCores = value

case EXECUTOR_GPUS =>
executorGPUs = value

case EXECUTOR_CORES =>
executorCores = value

Expand Down Expand Up @@ -423,6 +444,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case QUEUE =>
queue = value

case IS_TENSORFLOW =>
isTensorFlow = value

case NUM_TENSORFLOW_PS =>
tensorflowNumPs = value

case FILES =>
files = Utils.resolveURIs(value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var gpus: Int = 0
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
Expand All @@ -254,6 +255,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--gpus") :: value :: tail =>
gpus = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion external/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
Expand Down
14 changes: 7 additions & 7 deletions hadoop-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
Expand All @@ -69,13 +69,13 @@
intra-jackson-module version problems.
-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
<scope>${hadoop.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
Expand Down Expand Up @@ -105,13 +105,13 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-openstack</artifactId>
<version>${hadoop.version}</version>
<scope>${hadoop.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
Expand Down Expand Up @@ -179,13 +179,13 @@
Hadoop WASB client only arrived in Hadoop 2.7
-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-azure</artifactId>
<version>${hadoop.version}</version>
<scope>${hadoop.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
Expand Down
2 changes: 1 addition & 1 deletion launcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@

<!-- Not needed by the test code, but referenced by SparkSubmit which is used by the tests. -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<groupId>io.hops</groupId>
<artifactId>hadoop-client</artifactId>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";
/** Configuration key for the number of executor CPU cores. */
public static final String EXECUTOR_CORES = "spark.executor.cores";

/** Configuration key for the number of executor CPU cores. */
public static final String EXECUTOR_GPUS = "spark.executor.gpus";

static final String IS_TENSORFLOW = "spark.tensorflow.application";

static final String NUM_TENSORFLOW_PS = "spark.tensorflow.num.ps";

static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ class SparkSubmitOptionParser {
// YARN-only options.
protected final String ARCHIVES = "--archives";
protected final String EXECUTOR_CORES = "--executor-cores";
protected final String EXECUTOR_GPUS = "--executor-gpus";
protected final String KEYTAB = "--keytab";
protected final String NUM_EXECUTORS = "--num-executors";
protected final String PRINCIPAL = "--principal";
protected final String QUEUE = "--queue";

protected final String IS_TENSORFLOW = "--is-tensorflow";
protected final String NUM_TENSORFLOW_PS = "--num-tensorflow-ps";

/**
* This is the canonical list of spark-submit options. Each entry in the array contains the
* different aliases for the same option; the first element of each entry is the "official"
Expand All @@ -96,6 +99,7 @@ class SparkSubmitOptionParser {
{ DRIVER_JAVA_OPTIONS },
{ DRIVER_LIBRARY_PATH },
{ DRIVER_MEMORY },
{ EXECUTOR_GPUS },
{ EXECUTOR_CORES },
{ EXECUTOR_MEMORY },
{ FILES },
Expand All @@ -113,6 +117,8 @@ class SparkSubmitOptionParser {
{ PY_FILES },
{ QUEUE },
{ REPOSITORIES },
{ IS_TENSORFLOW },
{ NUM_TENSORFLOW_PS },
{ STATUS },
{ TOTAL_EXECUTOR_CORES },
};
Expand Down
Loading

0 comments on commit 939ae8c

Please sign in to comment.