diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java index 8e4eab48a0..58e1084101 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java @@ -20,6 +20,7 @@ package org.dinky.gateway.kubernetes.utils; import org.dinky.gateway.kubernetes.decorate.DinkySqlConfigMapDecorate; +import org.dinky.gateway.kubernetes.watcher.DeploymentStatusWatcher; import org.dinky.utils.TextUtil; import org.apache.flink.configuration.Configuration; @@ -49,6 +50,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.RollableScalableResource; import io.fabric8.kubernetes.client.utils.Serialization; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -65,9 +67,11 @@ public class K8sClientHelper { private KubernetesClient kubernetesClient; protected Configuration configuration; private DinkySqlConfigMapDecorate sqlFileDecorate; + private DeploymentStatusWatcher deploymentStatusWatch; public K8sClientHelper(Configuration configuration, String kubeConfig) { this.configuration = configuration; + deploymentStatusWatch = new DeploymentStatusWatcher(); initKubeClient(kubeConfig); } @@ -113,12 +117,12 @@ private void initKubeClient(String kubeConfig) { */ public Deployment createDinkyResource() { log.info("createDinkyResource"); - Deployment deployment = kubernetesClient + RollableScalableResource deploymentRollableScalableResource = kubernetesClient .apps() .deployments() .inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE)) - .withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID)) - .get(); + .withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID)); + Deployment deployment = deploymentRollableScalableResource.get(); List resources = getSqlFileDecorate().buildResources(); // set owner reference OwnerReference deploymentOwnerReference = new OwnerReferenceBuilder() @@ -134,6 +138,7 @@ public Deployment createDinkyResource() { resource.getMetadata().setOwnerReferences(Collections.singletonList(deploymentOwnerReference))); // create resources resources.forEach(resource -> log.info(Serialization.asYaml(resource))); + deploymentRollableScalableResource.watch(deploymentStatusWatch); kubernetesClient.resourceList(resources).createOrReplace(); return deployment; } @@ -141,6 +146,7 @@ public Deployment createDinkyResource() { /** * initPodTemplate * Preprocess the pod template + * * @param sqlStatement * @return */ @@ -166,8 +172,7 @@ public Pod decoratePodTemplate(String sqlStatement, String podTemplate) { /** * dumpPod2Str - * - * */ + */ public String dumpPod2Str(Pod pod) { // use snakyaml to serialize the pod Representer representer = new IgnoreNullRepresenter(); @@ -179,9 +184,11 @@ public String dumpPod2Str(Pod pod) { Yaml yaml = new Yaml(representer, options); return yaml.dump(pod); } + /** * close * delete the temporary directory and close the client + * * @return */ public boolean close() { diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/watcher/DeploymentStatusWatcher.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/watcher/DeploymentStatusWatcher.java new file mode 100644 index 0000000000..521967f3a0 --- /dev/null +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/watcher/DeploymentStatusWatcher.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.gateway.kubernetes.watcher; + +import org.apache.hadoop.util.StringUtils; + +import java.util.List; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.ObjectUtil; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DeploymentStatusWatcher implements Watcher { + + @Override + public void eventReceived(Action action, Deployment deployment) { + String deploymentName = deployment.getMetadata().getName(); + log.info("deployment name: {}, deployment action: {}", deploymentName, action); + if (ObjectUtil.isNotNull(deployment.getStatus()) + && CollectionUtil.isNotEmpty(deployment.getStatus().getConditions())) { + List conditions = deployment.getStatus().getConditions(); + conditions.forEach(condition -> { + if (StringUtils.equalsIgnoreCase(condition.getStatus(), "true")) { + log.info( + "deployment name: {}, deployment status: {}, message: {}", + deploymentName, + condition.getStatus(), + condition.getMessage()); + } else { + log.warn( + "deployment name: {}, deployment status: {}, message: {}", + deploymentName, + condition.getStatus(), + condition.getMessage()); + } + }); + } + } + + @Override + public void onClose(WatcherException cause) { + if (cause != null) { + log.error("Watcher closed due to exception: {}", cause.getMessage()); + } else { + log.info("Watcher closed gracefully."); + } + } +}