Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Pass-through envOverrides #451

Merged
merged 13 commits into from
Aug 16, 2024
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ All notable changes to this project will be documented in this file.
- `volumes`
- `volumeMounts`

### Fixed

- Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]).

[#450]: https://github.com/stackabletech/spark-k8s-operator/pull/450
[#451]: https://github.com/stackabletech/spark-k8s-operator/pull/451

## [24.7.0] - 2024-07-24

Expand Down
1 change: 1 addition & 0 deletions rust/crd/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@ pub const SPARK_DEFAULTS_FILE_NAME: &str = "spark-defaults.conf";

pub const SPARK_CLUSTER_ROLE: &str = "spark-k8s-clusterrole";
pub const SPARK_UID: i64 = 1000;
adwk67 marked this conversation as resolved.
Show resolved Hide resolved
pub const METRICS_PORT: u16 = 18081;
164 changes: 164 additions & 0 deletions rust/crd/src/history.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::s3logdir::S3LogDir;
use crate::tlscerts;
use crate::{affinity::history_affinity, constants::*};

use product_config::{types::PropertyNameKind, ProductConfigManager};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_operator::k8s_openapi::api::core::v1::EnvVar;
use stackable_operator::role_utils::RoleGroup;
use stackable_operator::{
commons::{
Expand Down Expand Up @@ -232,6 +235,96 @@ impl SparkHistoryServer {
)
.context(InvalidProductConfigSnafu)
}

pub fn merged_env(
&self,
s3logdir: &S3LogDir,
role_group_env_overrides: HashMap<String, String>,
) -> Vec<EnvVar> {
// Maps env var name to env var object. This allows env_overrides to work
// as expected (i.e. users can override the env var value).
let mut vars: BTreeMap<String, EnvVar> = BTreeMap::new();
let role_env_overrides = &self.role().config.env_overrides;

// This env var prevents the history server from detaching itself from the
// start script because this leads to the Pod terminating immediately.
vars.insert(
"SPARK_NO_DAEMONIZE".to_string(),
EnvVar {
name: "SPARK_NO_DAEMONIZE".to_string(),
value: Some("true".into()),
value_from: None,
},
);
vars.insert(
"SPARK_DAEMON_CLASSPATH".to_string(),
EnvVar {
name: "SPARK_DAEMON_CLASSPATH".to_string(),
value: Some("/stackable/spark/extra-jars/*".into()),
value_from: None,
},
);

let mut history_opts = vec![
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
format!(
"-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
),
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml")
];
adwk67 marked this conversation as resolved.
Show resolved Hide resolved

// if TLS is enabled build truststore
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
history_opts.extend(vec![
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
format!("-Djavax.net.ssl.trustStoreType=pkcs12"),
]);
}

vars.insert(
"SPARK_HISTORY_OPTS".to_string(),
EnvVar {
name: "SPARK_HISTORY_OPTS".to_string(),
value: Some(history_opts.join(" ")),
value_from: None,
},
);

// apply the role overrides
let mut role_envs = role_env_overrides.iter().map(|(env_name, env_value)| {
(
env_name.clone(),
EnvVar {
name: env_name.clone(),
value: Some(env_value.to_owned()),
value_from: None,
},
)
});

vars.extend(&mut role_envs);

// apply the role-group overrides
let mut role_group_envs =
role_group_env_overrides
.into_iter()
.map(|(env_name, env_value)| {
(
env_name.clone(),
EnvVar {
name: env_name.clone(),
value: Some(env_value),
value_from: None,
},
)
});

vars.extend(&mut role_group_envs);

// convert to Vec
vars.into_values().collect()
}
}

#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize, Display)]
Expand Down Expand Up @@ -363,3 +456,74 @@ impl Configuration for HistoryConfigFragment {
Ok(BTreeMap::new())
}
}

#[cfg(test)]
mod test {
use super::*;
use indoc::indoc;
use stackable_operator::commons::s3::InlinedS3BucketSpec;

#[test]
pub fn test_env_overrides() {
let input = indoc! {r#"
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkHistoryServer
metadata:
name: spark-history
spec:
image:
productVersion: 3.5.1
logFileDirectory:
s3:
prefix: eventlogs/
bucket:
reference: spark-history-s3-bucket
nodes:
envOverrides:
TEST_SPARK_HIST_VAR: ROLE
roleGroups:
default:
replicas: 1
config:
cleaner: true
envOverrides:
TEST_SPARK_HIST_VAR: ROLEGROUP
"#};

let deserializer = serde_yaml::Deserializer::from_str(input);
let history: SparkHistoryServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();

let s3_log_dir: S3LogDir = S3LogDir {
bucket: InlinedS3BucketSpec {
bucket_name: None,
connection: None,
},
prefix: "prefix".to_string(),
};

let merged_env = history.merged_env(
&s3_log_dir,
history
.spec
.nodes
.role_groups
.get("default")
.unwrap()
.config
.env_overrides
.clone(),
);

let env_map: BTreeMap<&str, Option<String>> = merged_env
.iter()
.map(|env_var| (env_var.name.as_str(), env_var.value.clone()))
.collect();

assert_eq!(
Some(&Some("ROLEGROUP".to_string())),
env_map.get("TEST_SPARK_HIST_VAR")
);
}
}
31 changes: 31 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,37 @@ impl SparkApplication {
}
}

pub fn merged_env(&self, role: SparkApplicationRole, env: &[EnvVar]) -> Vec<EnvVar> {
// Use a BTreeMap internally to enable replacement of existing keys
let mut env: BTreeMap<&String, EnvVar> = env
.iter()
.map(|env_var| (&env_var.name, env_var.clone()))
.collect();

// Merge the role-specific envOverrides on top
let role_envs = match role {
SparkApplicationRole::Submit => self.spec.job.as_ref().map(|j| &j.env_overrides),
SparkApplicationRole::Driver => self.spec.driver.as_ref().map(|d| &d.env_overrides),
SparkApplicationRole::Executor => {
self.spec.executor.as_ref().map(|e| &e.config.env_overrides)
}
};
if let Some(role_envs) = role_envs {
env.extend(role_envs.iter().map(|(k, v)| {
(
k,
EnvVar {
name: k.clone(),
value: Some(v.clone()),
..Default::default()
},
)
}))
}

env.into_values().collect()
}

pub fn validated_role_config(
&self,
resolved_product_image: &ResolvedProductImage,
Expand Down
3 changes: 2 additions & 1 deletion rust/crd/src/s3logdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ impl S3LogDir {
/// * spark.hadoop.fs.s3a.aws.credentials.provider
/// * spark.hadoop.fs.s3a.access.key
/// * spark.hadoop.fs.s3a.secret.key
/// instead, the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set
///
/// Instead, the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set
/// on the container start command.
pub fn history_server_spark_config(&self) -> BTreeMap<String, String> {
let mut result = BTreeMap::new();
Expand Down
68 changes: 14 additions & 54 deletions rust/operator-binary/src/history/history_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use stackable_operator::{
api::{
apps::v1::{StatefulSet, StatefulSetSpec},
core::v1::{
ConfigMap, EnvVar, PodSecurityContext, Service, ServiceAccount, ServicePort,
ServiceSpec,
ConfigMap, PodSecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec,
},
rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject},
},
Expand All @@ -36,14 +35,15 @@ use stackable_operator::{
role_utils::RoleGroupRef,
time::Duration,
};
use stackable_spark_k8s_crd::constants::METRICS_PORT;
use stackable_spark_k8s_crd::{
constants::{
ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME,
JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, OPERATOR_NAME,
SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_IMAGE_BASE_NAME,
SPARK_UID, STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG,
VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG,
VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG,
JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE, OPERATOR_NAME, SECRET_ACCESS_KEY,
SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID,
STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG,
VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG,
VOLUME_MOUNT_PATH_LOG_CONFIG,
},
history,
history::{HistoryConfig, SparkHistoryServer, SparkHistoryServerContainer},
Expand All @@ -59,8 +59,6 @@ use stackable_operator::k8s_openapi::DeepMerge;
use stackable_operator::logging::controller::ReconcilerError;
use strum::{EnumDiscriminants, IntoStaticStr};

const METRICS_PORT: u16 = 18081;

#[derive(Snafu, Debug, EnumDiscriminants)]
#[strum_discriminants(derive(IntoStaticStr))]
#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -440,6 +438,12 @@ fn build_stateful_set(
..PodSecurityContext::default()
});

let role_group = shs
.rolegroup(rolegroupref)
.with_context(|_| CannotRetrieveRoleGroupSnafu)?;

let merged_env = shs.merged_env(s3_log_dir, role_group.config.env_overrides);

let container_name = "spark-history";
let container = ContainerBuilder::new(container_name)
.context(InvalidContainerNameSnafu)?
Expand All @@ -449,7 +453,7 @@ fn build_stateful_set(
.args(command_args(s3_log_dir))
.add_container_port("http", 18080)
.add_container_port("metrics", METRICS_PORT.into())
.add_env_vars(env_vars(s3_log_dir))
.add_env_vars(merged_env)
.add_volume_mounts(s3_log_dir.volume_mounts())
.add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG)
.add_volume_mount(VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG)
Expand Down Expand Up @@ -477,10 +481,6 @@ fn build_stateful_set(

let mut pod_template = pb.build_template();
pod_template.merge_from(shs.role().config.pod_overrides.clone());
let role_group = shs
.rolegroup(rolegroupref)
.with_context(|_| CannotRetrieveRoleGroupSnafu)?;

pod_template.merge_from(role_group.config.pod_overrides);

Ok(StatefulSet {
Expand Down Expand Up @@ -670,46 +670,6 @@ fn command_args(s3logdir: &S3LogDir) -> Vec<String> {
vec![String::from("-c"), command.join(" && ")]
}

fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
let mut vars: Vec<EnvVar> = vec![];

// This env var prevents the history server from detaching itself from the
// start script because this leads to the Pod terminating immediately.
vars.push(EnvVar {
name: "SPARK_NO_DAEMONIZE".to_string(),
value: Some("true".into()),
value_from: None,
});
vars.push(EnvVar {
name: "SPARK_DAEMON_CLASSPATH".to_string(),
value: Some("/stackable/spark/extra-jars/*".into()),
value_from: None,
});

let mut history_opts = vec![
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
format!(
"-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
),
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml")
];
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
history_opts.extend(vec![
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
format!("-Djavax.net.ssl.trustStoreType=pkcs12"),
]);
}

vars.push(EnvVar {
name: "SPARK_HISTORY_OPTS".to_string(),
value: Some(history_opts.join(" ")),
value_from: None,
});
// if TLS is enabled build truststore
vars
}

fn labels<'a, T>(
shs: &'a T,
app_version_label: &'a str,
Expand Down
6 changes: 4 additions & 2 deletions rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,10 @@ fn pod_template(
) -> Result<PodTemplateSpec> {
let container_name = SparkContainer::Spark.to_string();
let mut cb = ContainerBuilder::new(&container_name).context(IllegalContainerNameSnafu)?;
let merged_env = spark_application.merged_env(role.clone(), env);

cb.add_volume_mounts(config.volume_mounts(spark_application, s3conn, s3logdir))
.add_env_vars(env.to_vec())
.add_env_vars(merged_env)
.resources(config.resources.clone().into())
.image_from_product_image(spark_image);

Expand Down Expand Up @@ -716,13 +717,14 @@ fn spark_job(
.context(IllegalContainerNameSnafu)?;

let args = [job_commands.join(" ")];
let merged_env = spark_application.merged_env(SparkApplicationRole::Submit, env);

cb.image_from_product_image(spark_image)
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
.args(vec![args.join(" && ")])
.resources(job_config.resources.clone().into())
.add_volume_mounts(spark_application.spark_job_volume_mounts(s3conn, s3logdir))
.add_env_vars(env.to_vec())
.add_env_vars(merged_env)
.add_env_var(
"SPARK_SUBMIT_OPTS",
format!(
Expand Down
Loading
Loading