diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 53d1b2b58f1f1..c0c83dda3c084 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -64,7 +64,7 @@ - org.apache.hadoop + io.hops hadoop-client diff --git a/core/pom.xml b/core/pom.xml index ed3efffa180b7..2b6f631e2679a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -59,7 +59,7 @@ xbean-asm6-shaded - org.apache.hadoop + io.hops hadoop-client diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1d32d964dc9d9..ded5747fb8ef3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -541,6 +541,8 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"), // Other options + OptionAssigner(args.executorGPUs, STANDALONE | YARN, ALL_DEPLOY_MODES, + confKey = "spark.executor.gpus"), OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.executor.cores"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 0998757715457..094958a56ab93 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -46,6 +46,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var deployMode: String = null var executorMemory: String = null var executorCores: String = null + var executorGPUs: String = null var totalExecutorCores: String = null var propertiesFile: String = null var driverMemory: String = null @@ -70,6 +71,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var isPython: Boolean = false var pyFiles: String = null var isR: Boolean = false + var isTensorFlow: String = null + var tensorflowNumPs: String = null var action: SparkSubmitAction = null val sparkProperties: HashMap[String, String] = new HashMap[String, String]() var proxyUser: String = null @@ -181,6 +184,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S totalExecutorCores = Option(totalExecutorCores) .orElse(sparkProperties.get("spark.cores.max")) .orNull + executorGPUs = Option(executorGPUs) + .orElse(sparkProperties.get("spark.executor.gpus")) + .orElse(env.get("SPARK_EXECUTOR_GPUS")) + .orNull name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull files = Option(files).orElse(sparkProperties.get("spark.files")).orNull @@ -198,6 +205,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .orNull numExecutors = Option(numExecutors) .getOrElse(sparkProperties.get("spark.executor.instances").orNull) + isTensorFlow = Option(isTensorFlow) + .orElse(sparkProperties.get("spark.tensorflow.application")) + .orElse(env.get("SPARK_TENSORFLOW_APPLICATION")) + .getOrElse("false") + tensorflowNumPs = Option(tensorflowNumPs) + .orElse(sparkProperties.get("spark.tensorflow.num.ps")) + .orElse(env.get("SPARK_TENSORFLOW_NUM_PS")) + .getOrElse("0") queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull @@ -326,6 +341,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | deployMode $deployMode | executorMemory $executorMemory | executorCores $executorCores + | executorGPUs $executorGPUs | totalExecutorCores $totalExecutorCores | propertiesFile $propertiesFile | driverMemory $driverMemory @@ -335,6 +351,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | driverExtraJavaOptions $driverExtraJavaOptions | supervise $supervise | queue $queue + | isTensorFlow $isTensorFlow + | tensorflowNumPs $tensorflowNumPs | numExecutors $numExecutors | files $files | pyFiles $pyFiles @@ -379,6 +397,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case TOTAL_EXECUTOR_CORES => totalExecutorCores = value + case EXECUTOR_GPUS => + executorGPUs = value + case EXECUTOR_CORES => executorCores = value @@ -423,6 +444,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case QUEUE => queue = value + case IS_TENSORFLOW => + isTensorFlow = value + + case NUM_TENSORFLOW_PS => + tensorflowNumPs = value + case FILES => files = Utils.resolveURIs(value) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 48d3630abd1f9..a4c0c45576f56 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -235,6 +235,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var executorId: String = null var hostname: String = null var cores: Int = 0 + var gpus: Int = 0 var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() @@ -254,6 +255,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--cores") :: value :: tail => cores = value.toInt argv = tail + case ("--gpus") :: value :: tail => + gpus = value.toInt + argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml index e8b13cdc2befe..8619bcd5ec449 100644 --- a/external/kafka-0-10-assembly/pom.xml +++ b/external/kafka-0-10-assembly/pom.xml @@ -70,7 +70,7 @@ provided - org.apache.hadoop + io.hops hadoop-client provided diff --git a/external/kinesis-asl-assembly/pom.xml b/external/kinesis-asl-assembly/pom.xml index 7d07e18f04a28..cdf305ee7e383 100644 --- a/external/kinesis-asl-assembly/pom.xml +++ b/external/kinesis-asl-assembly/pom.xml @@ -90,7 +90,7 @@ provided - org.apache.hadoop + io.hops hadoop-client provided diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index f4cb94147d9ea..d48b859787fa6 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -59,7 +59,7 @@ test - org.apache.hadoop + io.hops hadoop-client ${hadoop.version} provided @@ -69,13 +69,13 @@ intra-jackson-module version problems. --> - org.apache.hadoop + io.hops hadoop-aws ${hadoop.version} ${hadoop.deps.scope} - org.apache.hadoop + io.hops hadoop-common @@ -105,13 +105,13 @@ - org.apache.hadoop + io.hops hadoop-openstack ${hadoop.version} ${hadoop.deps.scope} - org.apache.hadoop + io.hops hadoop-common @@ -179,13 +179,13 @@ Hadoop WASB client only arrived in Hadoop 2.7 --> - org.apache.hadoop + io.hops hadoop-azure ${hadoop.version} ${hadoop.deps.scope} - org.apache.hadoop + io.hops hadoop-common diff --git a/launcher/pom.xml b/launcher/pom.xml index 22f60efae9afd..b27ca3e89fae8 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -80,7 +80,7 @@ - org.apache.hadoop + io.hops hadoop-client test diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index f86d40015bd22..b4e9b657465bd 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -63,6 +63,13 @@ public class SparkLauncher extends AbstractLauncher { public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath"; /** Configuration key for the number of executor CPU cores. */ public static final String EXECUTOR_CORES = "spark.executor.cores"; + + /** Configuration key for the number of executor CPU cores. */ + public static final String EXECUTOR_GPUS = "spark.executor.gpus"; + + static final String IS_TENSORFLOW = "spark.tensorflow.application"; + + static final String NUM_TENSORFLOW_PS = "spark.tensorflow.num.ps"; static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python"; diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index c57af92029460..a7eb3c5f601f0 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -71,11 +71,14 @@ class SparkSubmitOptionParser { // YARN-only options. protected final String ARCHIVES = "--archives"; protected final String EXECUTOR_CORES = "--executor-cores"; + protected final String EXECUTOR_GPUS = "--executor-gpus"; protected final String KEYTAB = "--keytab"; protected final String NUM_EXECUTORS = "--num-executors"; protected final String PRINCIPAL = "--principal"; protected final String QUEUE = "--queue"; - + protected final String IS_TENSORFLOW = "--is-tensorflow"; + protected final String NUM_TENSORFLOW_PS = "--num-tensorflow-ps"; + /** * This is the canonical list of spark-submit options. Each entry in the array contains the * different aliases for the same option; the first element of each entry is the "official" @@ -96,6 +99,7 @@ class SparkSubmitOptionParser { { DRIVER_JAVA_OPTIONS }, { DRIVER_LIBRARY_PATH }, { DRIVER_MEMORY }, + { EXECUTOR_GPUS }, { EXECUTOR_CORES }, { EXECUTOR_MEMORY }, { FILES }, @@ -113,6 +117,8 @@ class SparkSubmitOptionParser { { PY_FILES }, { QUEUE }, { REPOSITORIES }, + { IS_TENSORFLOW }, + { NUM_TENSORFLOW_PS }, { STATUS }, { TOTAL_EXECUTOR_CORES }, }; diff --git a/pom.xml b/pom.xml index f0e5ed9c563c8..80fcb57e49ed0 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ spark 1.7.16 1.2.17 - 2.6.5 + 2.8.2.6 2.5.0 ${hadoop.version} 1.6.0 @@ -241,6 +241,17 @@ false + + Hops + Hops Repo + https://bbc1.sics.se/archiva/repository/Hops/ + + true + + + true + + @@ -857,7 +868,7 @@ test - org.apache.hadoop + io.hops hadoop-client ${hadoop.version} ${hadoop.deps.scope} @@ -998,7 +1009,7 @@ ${hadoop.deps.scope} - org.apache.hadoop + io.hops hadoop-yarn-api ${yarn.version} ${hadoop.deps.scope} @@ -1038,7 +1049,7 @@ - org.apache.hadoop + io.hops hadoop-yarn-common ${yarn.version} ${hadoop.deps.scope} @@ -1078,7 +1089,7 @@ - org.apache.hadoop + io.hops hadoop-yarn-server-tests ${yarn.version} tests @@ -1119,7 +1130,7 @@ - org.apache.hadoop + io.hops hadoop-yarn-server-web-proxy ${yarn.version} ${hadoop.deps.scope} @@ -1159,7 +1170,7 @@ - org.apache.hadoop + io.hops hadoop-yarn-client ${yarn.version} ${hadoop.deps.scope} @@ -1689,7 +1700,7 @@ guava - org.apache.hadoop + io.hops hadoop-yarn-server-resourcemanager @@ -1734,17 +1745,25 @@ ${orc.deps.scope} - org.apache.hadoop + io.hops hadoop-common - org.apache.hadoop + io.hops hadoop-hdfs org.apache.hive hive-storage-api + + io.airlift + slice + + + org.apache.hadoop + * + @@ -1755,11 +1774,11 @@ ${orc.deps.scope} - org.apache.hadoop + io.hops hadoop-common - org.apache.hadoop + io.hops hadoop-mapreduce-client-core @@ -1774,6 +1793,10 @@ com.esotericsoftware kryo-shaded + + org.apache.hadoop + * + @@ -2688,6 +2711,14 @@ + + hadoop-2.8 + + 2.8.2.6 + 2.7.1 + + + hadoop-3.1 diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml index 2afdc15be4463..aedb133d7ffe1 100644 --- a/resource-managers/yarn/pom.xml +++ b/resource-managers/yarn/pom.xml @@ -58,23 +58,23 @@ test - org.apache.hadoop + io.hops hadoop-yarn-api - org.apache.hadoop + io.hops hadoop-yarn-common - org.apache.hadoop + io.hops hadoop-yarn-server-web-proxy - org.apache.hadoop + io.hops hadoop-yarn-client - org.apache.hadoop + io.hops hadoop-client @@ -126,7 +126,7 @@ - org.apache.hadoop + io.hops hadoop-yarn-server-tests tests test diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8f94e3f731007..30d0c73db0380 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -427,8 +427,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends logInfo { val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = _sparkConf.get(EXECUTOR_CORES) + val executorGPUs = sparkConf.get(EXECUTOR_GPUS) val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "", - "", executorMemory, executorCores, appId, securityMgr, localResources) + "", executorMemory, executorCores, executorGPUs, appId, securityMgr, localResources) dummyRunner.launchContextDebugInfo() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 49a0b93aa5c40..883000d113760 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -50,6 +50,7 @@ private[yarn] class ExecutorRunnable( hostname: String, executorMemory: Int, executorCores: Int, + executorGPUs: Int, appId: String, securityMgr: SecurityManager, localResources: Map[String, LocalResource]) extends Logging { @@ -206,6 +207,7 @@ private[yarn] class ExecutorRunnable( "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, + "--gpus", executorGPUs.toString, "--app-id", appId) ++ userClassPath ++ Seq( diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 8a7551de7c088..a233281922f4b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -140,10 +140,16 @@ private[yarn] class YarnAllocator( } // Number of cores per executor. protected val executorCores = sparkConf.get(EXECUTOR_CORES) + // Number of gpus per executor. + protected val executorGPUs = sparkConf.get(EXECUTOR_GPUS) // Resource capability requested for each executors private[yarn] val resource = Resource.newInstance( executorMemory + memoryOverhead + pysparkWorkerMemory, - executorCores) + executorCores, executorGPUs) + + protected val isTensorFlowApplication = sparkConf.get(IS_TENSORFLOW) + + protected var numTensorFlowParamServers = sparkConf.get(NUM_TENSORFLOW_PS) private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS)) @@ -185,8 +191,20 @@ private[yarn] class YarnAllocator( * fulfilled. */ private def getPendingAtLocation(location: String): Seq[ContainerRequest] = { - amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala - .flatMap(_.asScala) + // Either non-TensorFlow executor or TensorFlow PS, neither use GPUs + val nonGPUExecutorResource = Resource.newInstance( + resource.getMemory, resource.getVirtualCores, 0) + var containerRequests = amClient.getMatchingRequests( + RM_REQUEST_PRIORITY, location, nonGPUExecutorResource).asScala + if(isTensorFlowApplication) { + var tfWorkerContainerRequests = amClient.getMatchingRequests( + Priority.newInstance(RM_REQUEST_PRIORITY.getPriority + 1), location, resource).asScala + + containerRequests = containerRequests ++ tfWorkerContainerRequests + + } + containerRequests + containerRequests.flatMap(_.asScala) .toSeq } @@ -290,7 +308,8 @@ private[yarn] class YarnAllocator( if (missing > 0) { logInfo(s"Will request $missing executor container(s), each with " + s"${resource.getVirtualCores} core(s) and " + - s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)") + s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead) and " + + s"${resource.getGPUs} GPU(s)") // Split the pending container request into three groups: locality matched list, locality // unmatched list and non-locality list. Take the locality matched container request into @@ -386,7 +405,23 @@ private[yarn] class YarnAllocator( resource: Resource, nodes: Array[String], racks: Array[String]): ContainerRequest = { - new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull) + // Backward compliant, all non-TensorFlow spark jobs ask for containers as usual + if (!isTensorFlowApplication) { + new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull) + } + // Container requests for parameter server + // The first NUM_TENSORFLOW_PS will be containers allocated for parameter server + else if (isTensorFlowApplication && numTensorFlowParamServers > 0) { + numTensorFlowParamServers -= 1 + val psResource = Resource.newInstance(resource.getMemory, resource.getVirtualCores, 0) + new ContainerRequest(psResource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull) + } + // Container requests for worker + // Priority needs to be different from parameter server, otherwise ResourceRequests will overwrite in YARN + else { + new ContainerRequest(resource, nodes, racks, + Priority.newInstance(RM_REQUEST_PRIORITY.getPriority + 1), true, labelExpression.orNull) + } } /** @@ -456,7 +491,7 @@ private[yarn] class YarnAllocator( // memory, but use the asked vcore count for matching, effectively disabling matching on vcore // count. val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory, - resource.getVirtualCores) + resource.getVirtualCores, allocatedContainer.getResource.getGPUs) val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location, matchingResource) @@ -510,6 +545,7 @@ private[yarn] class YarnAllocator( executorHostname, executorMemory, executorCores, + executorGPUs, appAttemptId.getApplicationId.toString, securityMgr, localResources diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index ab8273bd6321d..403d6528ab2d2 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -73,6 +73,14 @@ package object config { .stringConf .createWithDefault("default") + private[spark] val IS_TENSORFLOW = ConfigBuilder("spark.tensorflow.application") + .booleanConf + .createWithDefault(false) + + private[spark] val NUM_TENSORFLOW_PS = ConfigBuilder("spark.tensorflow.num.ps") + .intConf + .createWithDefault(0) + private[spark] val HISTORY_SERVER_ADDRESS = ConfigBuilder("spark.yarn.historyServer.address") .stringConf .createOptional @@ -230,6 +238,10 @@ package object config { /* Executor configuration. */ + private[spark] val EXECUTOR_GPUS = ConfigBuilder("spark.executor.gpus") + .intConf + .createWithDefault(0) + private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores") .intConf .createWithDefault(1) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnTensorFlowSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnTensorFlowSuite.scala new file mode 100644 index 0000000000000..bbf71f1619a58 --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnTensorFlowSuite.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterEach, Matchers} + +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.deploy.yarn.YarnAllocator._ +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.util.ManualClock + +class YarnTensorFlowSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { + val conf = new YarnConfiguration() + val sparkConf = new SparkConf() + sparkConf.set("spark.driver.host", "localhost") + sparkConf.set("spark.driver.port", "4040") + sparkConf.set(SPARK_JARS, Seq("notarealjar.jar")) + sparkConf.set("spark.yarn.launchContainers", "false") + + val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0) + + // Resource returned by YARN. YARN can give larger containers than requested, so give 6 cores + // instead of the 5 requested and 3 GB instead of the 2 requested. + val containerResource = Resource.newInstance(3072, 6, 1) + + var rmClient: AMRMClient[ContainerRequest] = _ + + var containerNum = 0 + + override def beforeEach() { + super.beforeEach() + rmClient = AMRMClient.createAMRMClient() + rmClient.init(conf) + rmClient.start() + } + + override def afterEach() { + try { + rmClient.stop() + } finally { + super.afterEach() + } + } + + class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) { + override def hashCode(): Int = 0 + override def equals(other: Any): Boolean = false + } + + def createContainer(host: String): Container = { + // When YARN 2.6+ is required, avoid deprecation by using version with long second arg + val containerId = ContainerId.newInstance(appAttemptId, containerNum) + containerNum += 1 + val nodeId = NodeId.newInstance(host, 1000) + Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null) + } + + def createAllocator( + maxExecutors: Int = 2, + rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = { + val args = Array( + "--jar", "somejar.jar", + "--class", "SomeClass") + val sparkConfClone = sparkConf.clone() + sparkConfClone + .set("spark.dynamicAllocation.enabled", "false") + .set("spark.executor.instances", maxExecutors.toString) + .set("spark.executor.cores", "5") + .set("spark.executor.memory", "2048") + .set("spark.executor.gpus", "1") + .set("spark.tensorflow.application", "true") + .set("spark.tensorflow.num.ps", "1") + new YarnAllocator( + "not used", + mock(classOf[RpcEndpointRef]), + conf, + sparkConfClone, + rmClient, + appAttemptId, + new SecurityManager(sparkConf), + Map(), + new MockResolver()) + } + + def createPSContainer(host: String): Container = { + // When YARN 2.6+ is required, avoid deprecation by using version with long second arg + val containerId = ContainerId.newInstance(appAttemptId, containerNum) + containerNum += 1 + val nodeId = NodeId.newInstance(host, 1000) + val psRes = Resource.newInstance(3072, 6, 0) + Container.newInstance(containerId, nodeId, "", psRes, RM_REQUEST_PRIORITY, null) + } + + def createWorkerContainer(host: String): Container = { + // When YARN 2.6+ is required, avoid deprecation by using version with long second arg + val containerId = ContainerId.newInstance(appAttemptId, containerNum) + containerNum += 1 + val nodeId = NodeId.newInstance(host, 1000) + Container.newInstance(containerId, nodeId, "", containerResource, + Priority.newInstance(RM_REQUEST_PRIORITY.getPriority + 1), null) + } + + test("ps allocated") { + // request a single container and receive it + val handler = createAllocator(1) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getPendingAllocate.size should be (1) + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container)) + + handler.getNumExecutorsRunning should be (1) + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) + + val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size + size should be (0) + } + + test("ps and worker allocated") { + // request a few containers and receive some of them + val handler = createAllocator(2) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getPendingAllocate.size should be (2) + + val container1 = createPSContainer("host1") + val container2 = createWorkerContainer("host2") + + handler.handleAllocatedContainers(Array(container1, container2)) + + handler.getNumExecutorsRunning should be (2) + handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1") + handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2") + handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId) + handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId) + } +} diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index b522bfeac1300..18c21c521afc9 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -39,11 +39,13 @@ com.twitter parquet-hadoop-bundle + provided org.apache.spark spark-core_${scala.binary.version} ${project.version} + provided org.apache.spark @@ -97,6 +99,12 @@ ${hive.group} hive-exec + + + com.twitter + parquet-hadoop-bundle + + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 6a90c44a2633d..2f17a2e090723 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -65,13 +65,14 @@ private[hive] object IsolatedClientLoader extends Logging { case e: RuntimeException if e.getMessage.contains("hadoop") => // If the error message contains hadoop, it is probably because the hadoop // version cannot be resolved. + val fallbackVersion = "2.8.2.6" logWarning(s"Failed to resolve Hadoop artifacts for the version $hadoopVersion. " + s"We will change the hadoop version from $hadoopVersion to 2.6.0 and try again. " + "Hadoop classes will not be shared between Spark and Hive metastore client. " + "It is recommended to set jars used by Hive metastore client through " + "spark.sql.hive.metastore.jars in the production environment.") _sharesHadoopClasses = false - (downloadVersion(resolvedVersion, "2.6.5", ivyPath), "2.6.5") + (downloadVersion(resolvedVersion, "2.8.2.6", ivyPath), "2.6.5") } resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles) resolvedVersions((resolvedVersion, actualHadoopVersion)) @@ -109,13 +110,14 @@ private[hive] object IsolatedClientLoader extends Logging { Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++ Seq("com.google.guava:guava:14.0.1", - s"org.apache.hadoop:hadoop-client:$hadoopVersion") + s"io.hops:hadoop-client:$hadoopVersion") val classpath = quietly { SparkSubmitUtils.resolveMavenCoordinates( hiveArtifacts.mkString(","), SparkSubmitUtils.buildIvySettings( - Some("http://www.datanucleus.org/downloads/maven2"), + Some("https://bbc1.sics.se/archiva/repository/Hops/," + + "http://www.datanucleus.org/downloads/maven2"), ivyPath), exclusions = version.exclusions) }