Skip to content

Commit

Permalink
[FLINK-34429][flink-kubernetes] Setting annotations on internal service.
Browse files Browse the repository at this point in the history
[FLINK-34429][docs] Updating configuration to include kubernetes.internal-service.annotations key.

[FLINK-34429][flink-kubernetes] Kubernetes side parameters utility to retrieve INTERNAL_SERVICE_ANNOTATIONS from configuration

[FLINK-34429][flink-kubernetes] Test for pulling Internal-Service Annotations parameter (KubernetesJobManagerParameters)

[FLINK-34429][flink-kubernetes] Testing that internal-service is created with configured annotations
  • Loading branch information
barakbn authored and dannycranmer committed Mar 6, 2024
1 parent a9d9bab commit 70975b2
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<td>Boolean</td>
<td>Whether to enable HostNetwork mode. The HostNetwork allows the pod could use the node network namespace instead of the individual pod network namespace. Please note that the JobManager service account should have the permission to update Kubernetes service.</td>
</tr>
<tr>
<td><h5>kubernetes.internal-service.annotations</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Map</td>
<td>The user-specified annotations that are set to the internal Service. The value should be in the form of a1:v1,a2:v2</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.annotations</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ public class KubernetesConfigOptions {
"The user-specified annotations that are set to the rest Service. The value should be "
+ "in the form of a1:v1,a2:v2");

public static final ConfigOption<Map<String, String>> INTERNAL_SERVICE_ANNOTATIONS =
key("kubernetes.internal-service.annotations")
.mapType()
.noDefaultValue()
.withDescription(
"The user-specified annotations that are set to the internal Service. The value should be "
+ "in the form of a1:v1,a2:v2");

/**
* Defines the configuration key of that external resource in Kubernetes. This is used as a
* suffix in an actual config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public Map<String, String> getRestServiceAnnotations() {
.orElse(Collections.emptyMap());
}

public Map<String, String> getInternalServiceAnnotations() {
return flinkConfig
.getOptional(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS)
.orElse(Collections.emptyMap());
}

public int getJobManagerMemoryMB() {
return clusterSpecification.getMasterMemoryMB();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public Service buildUpInternalService(
.withNewMetadata()
.withName(serviceName)
.withLabels(kubernetesJobManagerParameters.getCommonLabels())
.withAnnotations(kubernetesJobManagerParameters.getInternalServiceAnnotations())
.endMetadata()
.withNewSpec()
.withClusterIP(HEADLESS_CLUSTER_IP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ protected void setupFlinkConfig() {
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + k, v));
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, userLabels);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, userAnnotations);
this.flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS, userAnnotations);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, nodeSelector);
this.flinkConfig.set(
JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ void testBuildAccompanyingKubernetesResources() throws IOException {
final Map<String, String> expectedLabels = getCommonLabels();
assertThat(internalService.getMetadata().getLabels()).isEqualTo(expectedLabels);

assertThat(internalService.getMetadata().getAnnotations()).isEqualTo(userAnnotations);

assertThat(internalService.getSpec().getType()).isNull();
assertThat(internalService.getSpec().getClusterIP()).isEqualTo("None");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void testGetJobManagerAnnotations() {
}

@Test
void testGetServiceAnnotations() {
void testGetRestServiceAnnotations() {
final Map<String, String> expectedAnnotations = new HashMap<>();
expectedAnnotations.put("a1", "v1");
expectedAnnotations.put("a2", "v2");
Expand All @@ -109,6 +109,20 @@ void testGetServiceAnnotations() {
assertThat(resultAnnotations).isEqualTo(expectedAnnotations);
}

@Test
void testGetInternalServiceAnnotations() {
final Map<String, String> expectedAnnotations = new HashMap<>();
expectedAnnotations.put("a1", "v1");
expectedAnnotations.put("a2", "v2");

flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS, expectedAnnotations);

final Map<String, String> resultAnnotations =
kubernetesJobManagerParameters.getInternalServiceAnnotations();

assertThat(resultAnnotations).isEqualTo(expectedAnnotations);
}

@Test
void testGetJobManagerMemoryMB() {
assertThat(kubernetesJobManagerParameters.getJobManagerMemoryMB())
Expand Down

0 comments on commit 70975b2

Please sign in to comment.