From 70975b258700f12cdb4e9352180be2f4213a6a08 Mon Sep 17 00:00:00 2001 From: Barak Ben-Nathan Date: Sun, 18 Feb 2024 14:46:11 +0200 Subject: [PATCH] [FLINK-34429][flink-kubernetes] Setting annotations on internal service. [FLINK-34429][docs] Updating configuration to include kubernetes.internal-service.annotations key. [FLINK-34429][flink-kubernetes] Kubernetes side parameters utility to retrieve INTERNAL_SERVICE_ANNOTATIONS from configuration [FLINK-34429][flink-kubernetes] Test for pulling Internal-Service Annotations parameter (KubernetesJobManagerParameters) [FLINK-34429][flink-kubernetes] Testing that internal-service is created with configured annotations --- .../kubernetes_config_configuration.html | 6 ++++++ .../configuration/KubernetesConfigOptions.java | 8 ++++++++ .../KubernetesJobManagerParameters.java | 6 ++++++ .../services/HeadlessClusterIPService.java | 1 + .../kubeclient/KubernetesJobManagerTestBase.java | 1 + .../decorators/InternalServiceDecoratorTest.java | 2 ++ .../KubernetesJobManagerParametersTest.java | 16 +++++++++++++++- 7 files changed, 39 insertions(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index bb45be260aa35..49c5e96cec7d3 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -110,6 +110,12 @@ Boolean Whether to enable HostNetwork mode. The HostNetwork allows the pod could use the node network namespace instead of the individual pod network namespace. Please note that the JobManager service account should have the permission to update Kubernetes service. + +
kubernetes.internal-service.annotations
+ (none) + Map + The user-specified annotations that are set to the internal Service. The value should be in the form of a1:v1,a2:v2 +
kubernetes.jobmanager.annotations
(none) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 1e2e6115fc3c3..a2e2e7c9b054e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -377,6 +377,14 @@ public class KubernetesConfigOptions { "The user-specified annotations that are set to the rest Service. The value should be " + "in the form of a1:v1,a2:v2"); + public static final ConfigOption> INTERNAL_SERVICE_ANNOTATIONS = + key("kubernetes.internal-service.annotations") + .mapType() + .noDefaultValue() + .withDescription( + "The user-specified annotations that are set to the internal Service. The value should be " + + "in the form of a1:v1,a2:v2"); + /** * Defines the configuration key of that external resource in Kubernetes. This is used as a * suffix in an actual config. diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java index 845f930d18dc2..8fc09caa58adc 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java @@ -118,6 +118,12 @@ public Map getRestServiceAnnotations() { .orElse(Collections.emptyMap()); } + public Map getInternalServiceAnnotations() { + return flinkConfig + .getOptional(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS) + .orElse(Collections.emptyMap()); + } + public int getJobManagerMemoryMB() { return clusterSpecification.getMasterMemoryMB(); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java index ef932ff03da76..675ffb87e1c19 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java @@ -50,6 +50,7 @@ public Service buildUpInternalService( .withNewMetadata() .withName(serviceName) .withLabels(kubernetesJobManagerParameters.getCommonLabels()) + .withAnnotations(kubernetesJobManagerParameters.getInternalServiceAnnotations()) .endMetadata() .withNewSpec() .withClusterIP(HEADLESS_CLUSTER_IP) diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java index f0cec0b391d42..eba124773fcef 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java @@ -63,6 +63,7 @@ protected void setupFlinkConfig() { ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + k, v)); this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, userLabels); this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, userAnnotations); + this.flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS, userAnnotations); this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, nodeSelector); this.flinkConfig.set( JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY)); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java index 46aca8813305c..92f91648290d2 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java @@ -69,6 +69,8 @@ void testBuildAccompanyingKubernetesResources() throws IOException { final Map expectedLabels = getCommonLabels(); assertThat(internalService.getMetadata().getLabels()).isEqualTo(expectedLabels); + assertThat(internalService.getMetadata().getAnnotations()).isEqualTo(userAnnotations); + assertThat(internalService.getSpec().getType()).isNull(); assertThat(internalService.getSpec().getClusterIP()).isEqualTo("None"); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java index 67a6fd51fbfbf..8883d0b94ee1b 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java @@ -96,7 +96,7 @@ void testGetJobManagerAnnotations() { } @Test - void testGetServiceAnnotations() { + void testGetRestServiceAnnotations() { final Map expectedAnnotations = new HashMap<>(); expectedAnnotations.put("a1", "v1"); expectedAnnotations.put("a2", "v2"); @@ -109,6 +109,20 @@ void testGetServiceAnnotations() { assertThat(resultAnnotations).isEqualTo(expectedAnnotations); } + @Test + void testGetInternalServiceAnnotations() { + final Map expectedAnnotations = new HashMap<>(); + expectedAnnotations.put("a1", "v1"); + expectedAnnotations.put("a2", "v2"); + + flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS, expectedAnnotations); + + final Map resultAnnotations = + kubernetesJobManagerParameters.getInternalServiceAnnotations(); + + assertThat(resultAnnotations).isEqualTo(expectedAnnotations); + } + @Test void testGetJobManagerMemoryMB() { assertThat(kubernetesJobManagerParameters.getJobManagerMemoryMB())