Skip to content

Commit

Permalink
Don't mount local files using secrets into executors
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv committed Feb 23, 2021
1 parent b040c4e commit e2d8784
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private[spark] object Config extends Logging {

val KUBERNETES_SECRET_FILE_MOUNT_PATH =
ConfigBuilder("spark.kubernetes.file.secretMount.path")
.doc("When mounting files as secret, they're made available on drivers at this path..")
.doc("When mounting files as secret, they're made available on drivers at this path.")
.internal()
.stringConf
.createWithDefault("/var/data/spark-submitted-files")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,37 @@ import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, Kubern
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, PythonMainAppResource, RMainAppResource}
import org.apache.spark.internal.config.FILES
import org.apache.spark.util.Utils

/**
* Mount local files listed in `spark.files` into a volume on the driver.
*
* The volume is populated using a secret which in turn is populated with the base64-encoded
* file contents. The volume is only mounted into drivers, not executors. That's because drivers
* can make `spark.files` available to executors using [[org.apache.spark.SparkContext.addFile]].
*
* This is a Palantir addition that works well for the small files we tend to add in `spark.files`.
* Spark's out-of-the-box solution is in [[BasicDriverFeatureStep]] and serves local files by
* uploading them to an HCFS and serving them from there.
*/
private[spark] class MountLocalDriverFilesFeatureStep(conf: KubernetesDriverConf)
extends MountLocalFilesFeatureStep(conf) {
extends KubernetesFeatureConfigStep {

private val enabled = conf.get(KUBERNETES_SECRET_FILE_MOUNT_ENABLED)

val allFiles: Seq[String] = {
Utils.stringToSeq(conf.sparkConf.get("spark.files", "")) ++
private val mountPath = conf.get(KUBERNETES_SECRET_FILE_MOUNT_PATH)

private val secretName = s"${conf.resourceNamePrefix}-mounted-files"

def allFiles: Seq[String] = {
Utils.stringToSeq(conf.sparkConf.get(FILES.key, "")) ++
(conf.mainAppResource match {
case JavaMainAppResource(_) => Nil
case PythonMainAppResource(res) => Seq(res)
case RMainAppResource(res) => Seq(res)
})
}
}

private[spark] class MountLocalExecutorFilesFeatureStep(conf: KubernetesConf)
extends MountLocalFilesFeatureStep(conf) {

val allFiles: Seq[String] = Nil
}

private[spark] abstract class MountLocalFilesFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {

private val enabled = conf.get(KUBERNETES_SECRET_FILE_MOUNT_ENABLED)

// Secret name needs to be the same for drivers and executors because both will have a volume
// populated by the secret, but Spark's k8s client will only store the secret configured on the
// driver. If the secret names don't match, executors will fail to mount the volume.
private val secretName = s"${secretNamePrefix()}-mounted-files"

private val mountPath = conf.get(KUBERNETES_SECRET_FILE_MOUNT_PATH)

override def configurePod(pod: SparkPod): SparkPod = {
if (!enabled) return pod
Expand Down Expand Up @@ -93,7 +92,7 @@ private[spark] abstract class MountLocalFilesFeatureStep(conf: KubernetesConf)
override def getAdditionalPodSystemProperties(): Map[String, String] = {
if (!enabled) return Map.empty

val resolvedFiles = allFiles()
val resolvedFiles = allFiles
.map(file => {
val uri = Utils.resolveURI(file)
if (shouldMountFile(uri)) {
Expand All @@ -103,13 +102,13 @@ private[spark] abstract class MountLocalFilesFeatureStep(conf: KubernetesConf)
file
}
})
Map("spark.files" -> resolvedFiles.mkString(","))
Map(FILES.key -> resolvedFiles.mkString(","))
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (!enabled) return Nil

val localFiles = allFiles()
val localFiles = allFiles
.map(Utils.resolveURI)
.filter(shouldMountFile)
.map(_.getPath)
Expand All @@ -127,24 +126,11 @@ private[spark] abstract class MountLocalFilesFeatureStep(conf: KubernetesConf)
Seq(localFilesSecret)
}

def allFiles(): Seq[String]

private def shouldMountFile(file: URI): Boolean = {
Option(file.getScheme) match {
case Some("file") => true
case None => true
case _ => false
}
}

// Like KubernetesConf#getResourceNamePrefix but unique per app, not per resource.
private def secretNamePrefix(): String = {
s"${conf.appName}"
.trim
.toLowerCase(Locale.ROOT)
.replaceAll("\\s+", "-")
.replaceAll("\\.", "-")
.replaceAll("[^a-z0-9\\-]", "")
.replaceAll("-+", "-")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ private[spark] class KubernetesExecutorBuilder {
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountLocalExecutorFilesFeatureStep(conf))
new LocalDirsFeatureStep(conf))

features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.util.Utils

class MountLocalFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
class MountLocalDriverFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {

private var kubernetesConf: KubernetesDriverConf = _
private var sparkFiles: Seq[String] = _
private var localFiles: Seq[File] = _
private var stepUnderTest: MountLocalFilesFeatureStep = _
private var stepUnderTest: MountLocalDriverFilesFeatureStep = _

before {
val tempDir = Utils.createTempDir()
Expand Down Expand Up @@ -70,7 +70,7 @@ class MountLocalFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
val volume = configuredPod.pod.getSpec.getVolumes.get(0)
assert(volume.getName === "submitted-files")
assert(volume.getSecret.getSecretName === "test-app-mounted-files")
assert(volume.getSecret.getSecretName === s"${kubernetesConf.resourceNamePrefix}-mounted-files")
assert(configuredPod.container.getVolumeMounts.size === 1)
val volumeMount = configuredPod.container.getVolumeMounts.get(0)
assert(volumeMount.getName === "submitted-files")
Expand Down Expand Up @@ -99,7 +99,7 @@ class MountLocalFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter
assert(secrets.size === 1)
assert(secrets.head.isInstanceOf[Secret])
val secret = secrets.head.asInstanceOf[Secret]
assert(secret.getMetadata.getName === s"test-app-mounted-files")
assert(secret.getMetadata.getName === s"${kubernetesConf.resourceNamePrefix}-mounted-files")
val secretData = secret.getData.asScala
assert(secretData.size === 2)
assert(decodeToUtf8(secretData("file1.txt")) === "a")
Expand Down

0 comments on commit e2d8784

Please sign in to comment.