diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index ec29ede89cfb1..d99d35a17209b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -423,7 +423,7 @@ private[spark] object Config extends Logging { val KUBERNETES_SECRET_FILE_MOUNT_PATH = ConfigBuilder("spark.kubernetes.file.secretMount.path") - .doc("When mounting files as secret, they're made available on drivers at this path..") + .doc("When mounting files as secret, they're made available on drivers at this path.") .internal() .stringConf .createWithDefault("/var/data/spark-submitted-files") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountLocalFilesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountLocalDriverFilesFeatureStep.scala similarity index 75% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountLocalFilesFeatureStep.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountLocalDriverFilesFeatureStep.scala index cd15a72a38274..c3fca0ac324df 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountLocalFilesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountLocalDriverFilesFeatureStep.scala @@ -31,38 +31,37 @@ import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, Kubern import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, PythonMainAppResource, RMainAppResource} +import org.apache.spark.internal.config.FILES import org.apache.spark.util.Utils +/** + * Mount local files listed in `spark.files` into a volume on the driver. + * + * The volume is populated using a secret which in turn is populated with the base64-encoded + * file contents. The volume is only mounted into drivers, not executors. That's because drivers + * can make `spark.files` available to executors using [[org.apache.spark.SparkContext.addFile]]. + * + * This is a Palantir addition that works well for the small files we tend to add in `spark.files`. + * Spark's out-of-the-box solution is in [[BasicDriverFeatureStep]] and serves local files by + * uploading them to an HCFS and serving them from there. + */ private[spark] class MountLocalDriverFilesFeatureStep(conf: KubernetesDriverConf) - extends MountLocalFilesFeatureStep(conf) { + extends KubernetesFeatureConfigStep { + + private val enabled = conf.get(KUBERNETES_SECRET_FILE_MOUNT_ENABLED) - val allFiles: Seq[String] = { - Utils.stringToSeq(conf.sparkConf.get("spark.files", "")) ++ + private val mountPath = conf.get(KUBERNETES_SECRET_FILE_MOUNT_PATH) + + private val secretName = s"${conf.resourceNamePrefix}-mounted-files" + + def allFiles: Seq[String] = { + Utils.stringToSeq(conf.sparkConf.get(FILES.key, "")) ++ (conf.mainAppResource match { case JavaMainAppResource(_) => Nil case PythonMainAppResource(res) => Seq(res) case RMainAppResource(res) => Seq(res) }) } -} - -private[spark] class MountLocalExecutorFilesFeatureStep(conf: KubernetesConf) - extends MountLocalFilesFeatureStep(conf) { - - val allFiles: Seq[String] = Nil -} - -private[spark] abstract class MountLocalFilesFeatureStep(conf: KubernetesConf) - extends KubernetesFeatureConfigStep { - - private val enabled = conf.get(KUBERNETES_SECRET_FILE_MOUNT_ENABLED) - - // Secret name needs to be the same for drivers and executors because both will have a volume - // populated by the secret, but Spark's k8s client will only store the secret configured on the - // driver. If the secret names don't match, executors will fail to mount the volume. - private val secretName = s"${secretNamePrefix()}-mounted-files" - - private val mountPath = conf.get(KUBERNETES_SECRET_FILE_MOUNT_PATH) override def configurePod(pod: SparkPod): SparkPod = { if (!enabled) return pod @@ -93,7 +92,7 @@ private[spark] abstract class MountLocalFilesFeatureStep(conf: KubernetesConf) override def getAdditionalPodSystemProperties(): Map[String, String] = { if (!enabled) return Map.empty - val resolvedFiles = allFiles() + val resolvedFiles = allFiles .map(file => { val uri = Utils.resolveURI(file) if (shouldMountFile(uri)) { @@ -103,13 +102,13 @@ private[spark] abstract class MountLocalFilesFeatureStep(conf: KubernetesConf) file } }) - Map("spark.files" -> resolvedFiles.mkString(",")) + Map(FILES.key -> resolvedFiles.mkString(",")) } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { if (!enabled) return Nil - val localFiles = allFiles() + val localFiles = allFiles .map(Utils.resolveURI) .filter(shouldMountFile) .map(_.getPath) @@ -127,8 +126,6 @@ private[spark] abstract class MountLocalFilesFeatureStep(conf: KubernetesConf) Seq(localFilesSecret) } - def allFiles(): Seq[String] - private def shouldMountFile(file: URI): Boolean = { Option(file.getScheme) match { case Some("file") => true @@ -136,15 +133,4 @@ private[spark] abstract class MountLocalFilesFeatureStep(conf: KubernetesConf) case _ => false } } - - // Like KubernetesConf#getResourceNamePrefix but unique per app, not per resource. - private def secretNamePrefix(): String = { - s"${conf.appName}" - .trim - .toLowerCase(Locale.ROOT) - .replaceAll("\\s+", "-") - .replaceAll("\\.", "-") - .replaceAll("[^a-z0-9\\-]", "") - .replaceAll("-+", "-") - } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 36be4b5095d54..22bff2c807330 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -45,8 +45,7 @@ private[spark] class KubernetesExecutorBuilder { new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), - new LocalDirsFeatureStep(conf), - new MountLocalExecutorFilesFeatureStep(conf)) + new LocalDirsFeatureStep(conf)) features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountLocalFilesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountLocalDriverFilesFeatureStepSuite.scala similarity index 92% rename from resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountLocalFilesFeatureStepSuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountLocalDriverFilesFeatureStepSuite.scala index 0f8edb6af5f02..70bbf8a30b2e4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountLocalFilesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountLocalDriverFilesFeatureStepSuite.scala @@ -32,12 +32,12 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.util.Utils -class MountLocalFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { +class MountLocalDriverFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private var kubernetesConf: KubernetesDriverConf = _ private var sparkFiles: Seq[String] = _ private var localFiles: Seq[File] = _ - private var stepUnderTest: MountLocalFilesFeatureStep = _ + private var stepUnderTest: MountLocalDriverFilesFeatureStep = _ before { val tempDir = Utils.createTempDir() @@ -70,7 +70,7 @@ class MountLocalFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter assert(configuredPod.pod.getSpec.getVolumes.size === 1) val volume = configuredPod.pod.getSpec.getVolumes.get(0) assert(volume.getName === "submitted-files") - assert(volume.getSecret.getSecretName === "test-app-mounted-files") + assert(volume.getSecret.getSecretName === s"${kubernetesConf.resourceNamePrefix}-mounted-files") assert(configuredPod.container.getVolumeMounts.size === 1) val volumeMount = configuredPod.container.getVolumeMounts.get(0) assert(volumeMount.getName === "submitted-files") @@ -99,7 +99,7 @@ class MountLocalFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter assert(secrets.size === 1) assert(secrets.head.isInstanceOf[Secret]) val secret = secrets.head.asInstanceOf[Secret] - assert(secret.getMetadata.getName === s"test-app-mounted-files") + assert(secret.getMetadata.getName === s"${kubernetesConf.resourceNamePrefix}-mounted-files") val secretData = secret.getData.asScala assert(secretData.size === 2) assert(decodeToUtf8(secretData("file1.txt")) === "a")