Skip to content

Commit

Permalink
[Optimization][dinky-getaway] Add Deployment status monitoring. (#3989)
Browse files Browse the repository at this point in the history
Co-authored-by: yuhang2.zhang <yuhang2.zhang@ly.com>
  • Loading branch information
Jam804 and yuhang2.zhang authored Dec 2, 2024
1 parent 4585355 commit 36f0be2
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.gateway.kubernetes.utils;

import org.dinky.gateway.kubernetes.decorate.DinkySqlConfigMapDecorate;
import org.dinky.gateway.kubernetes.watcher.DeploymentStatusWatcher;
import org.dinky.utils.TextUtil;

import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -49,6 +50,7 @@
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.utils.Serialization;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -65,9 +67,11 @@ public class K8sClientHelper {
private KubernetesClient kubernetesClient;
protected Configuration configuration;
private DinkySqlConfigMapDecorate sqlFileDecorate;
private DeploymentStatusWatcher deploymentStatusWatch;

public K8sClientHelper(Configuration configuration, String kubeConfig) {
this.configuration = configuration;
deploymentStatusWatch = new DeploymentStatusWatcher();
initKubeClient(kubeConfig);
}

Expand Down Expand Up @@ -113,12 +117,12 @@ private void initKubeClient(String kubeConfig) {
*/
public Deployment createDinkyResource() {
log.info("createDinkyResource");
Deployment deployment = kubernetesClient
RollableScalableResource<Deployment> deploymentRollableScalableResource = kubernetesClient
.apps()
.deployments()
.inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE))
.withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID))
.get();
.withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID));
Deployment deployment = deploymentRollableScalableResource.get();
List<HasMetadata> resources = getSqlFileDecorate().buildResources();
// set owner reference
OwnerReference deploymentOwnerReference = new OwnerReferenceBuilder()
Expand All @@ -134,13 +138,15 @@ public Deployment createDinkyResource() {
resource.getMetadata().setOwnerReferences(Collections.singletonList(deploymentOwnerReference)));
// create resources
resources.forEach(resource -> log.info(Serialization.asYaml(resource)));
deploymentRollableScalableResource.watch(deploymentStatusWatch);
kubernetesClient.resourceList(resources).createOrReplace();
return deployment;
}

/**
* initPodTemplate
* Preprocess the pod template
*
* @param sqlStatement
* @return
*/
Expand All @@ -166,8 +172,7 @@ public Pod decoratePodTemplate(String sqlStatement, String podTemplate) {

/**
* dumpPod2Str
*
* */
*/
public String dumpPod2Str(Pod pod) {
// use snakyaml to serialize the pod
Representer representer = new IgnoreNullRepresenter();
Expand All @@ -179,9 +184,11 @@ public String dumpPod2Str(Pod pod) {
Yaml yaml = new Yaml(representer, options);
return yaml.dump(pod);
}

/**
* close
* delete the temporary directory and close the client
*
* @return
*/
public boolean close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.gateway.kubernetes.watcher;

import org.apache.hadoop.util.StringUtils;

import java.util.List;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DeploymentStatusWatcher implements Watcher<Deployment> {

@Override
public void eventReceived(Action action, Deployment deployment) {
String deploymentName = deployment.getMetadata().getName();
log.info("deployment name: {}, deployment action: {}", deploymentName, action);
if (ObjectUtil.isNotNull(deployment.getStatus())
&& CollectionUtil.isNotEmpty(deployment.getStatus().getConditions())) {
List<DeploymentCondition> conditions = deployment.getStatus().getConditions();
conditions.forEach(condition -> {
if (StringUtils.equalsIgnoreCase(condition.getStatus(), "true")) {
log.info(
"deployment name: {}, deployment status: {}, message: {}",
deploymentName,
condition.getStatus(),
condition.getMessage());
} else {
log.warn(
"deployment name: {}, deployment status: {}, message: {}",
deploymentName,
condition.getStatus(),
condition.getMessage());
}
});
}
}

@Override
public void onClose(WatcherException cause) {
if (cause != null) {
log.error("Watcher closed due to exception: {}", cause.getMessage());
} else {
log.info("Watcher closed gracefully.");
}
}
}

0 comments on commit 36f0be2

Please sign in to comment.